CSU-MS2-T2 / app.py
Tingxie's picture
Prevent Gradio startup crash via deferred DB loading
93c8da3 verified
from __future__ import annotations
import sqlite3
import gradio as gr
from infer import ModelInference
from model import ModelCLR
from matchms.importing import load_from_mgf,load_from_msp
import matchms.filtering as msfilters
import numpy as np
from rdkit.Chem import Draw
from rdkit.Chem.Descriptors import ExactMolWt
from rdkit import Chem
import torch
import yaml
import pickle
import subprocess
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import tempfile
import shutil
import os
import matplotlib.pyplot as plt
import gradio as gr
from typing import Iterable
from gradio.themes.base import Base
from gradio.themes.utils import colors, fonts, sizes
from huggingface_hub import hf_hub_download
import time
import concurrent.futures
class Seafoam(Base):
def __init__(
self,
*,
primary_hue: colors.Color | str = colors.emerald,
secondary_hue: colors.Color | str = colors.blue,
neutral_hue: colors.Color | str = colors.blue,
spacing_size: sizes.Size | str = sizes.spacing_md,
radius_size: sizes.Size | str = sizes.radius_md,
text_size: sizes.Size | str = sizes.text_lg,
font: fonts.Font
| str
| Iterable[fonts.Font | str] = (
fonts.GoogleFont("Quicksand"),
"ui-sans-serif",
"sans-serif",
),
font_mono: fonts.Font
| str
| Iterable[fonts.Font | str] = (
fonts.GoogleFont("IBM Plex Mono"),
"ui-monospace",
"monospace",
),
):
super().__init__(
primary_hue=primary_hue,
secondary_hue=secondary_hue,
neutral_hue=neutral_hue,
spacing_size=spacing_size,
radius_size=radius_size,
text_size=text_size,
font=font,
font_mono=font_mono,
)
super().set(
#body_background_fill="repeating-linear-gradient(45deg, *primary_200, *primary_200 10px, *primary_50 10px, *primary_50 20px)",
body_background_fill_dark="repeating-linear-gradient(45deg, *primary_800, *primary_800 10px, *primary_900 10px, *primary_900 20px)",
button_primary_background_fill="linear-gradient(90deg, *primary_300, *secondary_400)",
button_primary_background_fill_hover="linear-gradient(90deg, *primary_200, *secondary_300)",
button_primary_text_color="white",
button_primary_background_fill_dark="linear-gradient(90deg, *primary_600, *secondary_800)",
slider_color="*secondary_300",
slider_color_dark="*secondary_600",
block_title_text_weight="600",
block_border_width="3px",
block_shadow="*shadow_drop_lg",
button_large_padding="17px",
body_text_color="#000000",
)
seafoam = Seafoam()
custom_css = """
<style>
.file-upload-height {
height:330px !important;
display: none;
}
.file-upload-height2 {
height:190px !important;
}
.gallery-height {
height: 350px !important;
}
#custom_plot {
height: 300px !important;
}
#custom_plot2 {
height: 480px !important;
}
</style>
"""
def spectrum_processing(s):
"""This is how one would typically design a desired pre- and post-
processing pipeline."""
s = msfilters.normalize_intensities(s)
s = msfilters.select_by_mz(s, mz_from=0, mz_to=1500)
return s
def draw_mass_spectrum(peak_data_path):
ms2 = list(load_from_msp(peak_data_path.name))[0]
ms2 = spectrum_processing(ms2)
Mz = np.array(ms2.mz)
Intens = np.array(ms2.intensities)
plt.figure(figsize=(6,3))
for i in range(len(Mz)):
plt.axvline(x=Mz[i], ymin=0, ymax=Intens[i],c='red')
plt.xlabel("m/z")
plt.ylabel("Intensity")
plt.title("Mass Spectrum")
return plt
def search_structure_from_mass(structureDB,mass, ppm):
structures=pd.DataFrame()
mmin = mass - mass*ppm/10**6
mmax = mass + mass*ppm/10**6
structures = structureDB[(structureDB['MonoisotopicMass'] >= mmin) & (structureDB['MonoisotopicMass'] <= mmax)]
return structures
conn = None
def initialize_db():
global conn
if conn is None:
dataset_repo = "Tingxie/CSU-MS2-DB"
db_filename = "csu_ms2_db.db"
token = os.getenv("HF_TOKEN")
print("Starting large file download and DB connection...")
db_path = hf_hub_download(repo_id=dataset_repo, filename=db_filename, repo_type="dataset", token=token)
conn = sqlite3.connect(db_path, check_same_thread=False)
print("DB initialization complete.")
return conn
#dataset_repo = "Tingxie/CSU-MS2-DB"
#db_filename = "csu_ms2_db.db"
#token = os.getenv("HF_TOKEN")
#db_path = hf_hub_download(repo_id=dataset_repo, filename=db_filename, repo_type="dataset", token=token)
#conn = sqlite3.connect(db_path, check_same_thread=False)
device='cpu'
pretrain_model_path_low,pretrain_model_path_median,pretrain_model_path_high='model/low_energy/checkpoints/model.pth','model/median_energy/checkpoints/model.pth','model/high_energy/checkpoints/model.pth'
config_path = "model/low_energy/checkpoints/config.yaml"
config = yaml.load(open(config_path, "r"), Loader=yaml.FullLoader)
model_low = ModelCLR(**config["model_config"]).to(device)
model_median = ModelCLR(**config["model_config"]).to(device)
model_high = ModelCLR(**config["model_config"]).to(device)
state_dict_low = torch.load(pretrain_model_path_low, map_location=device)
state_dict_median = torch.load(pretrain_model_path_median, map_location=device)
state_dict_high = torch.load(pretrain_model_path_high, map_location=device)
model_low.load_state_dict(state_dict_low)
model_low.eval()
model_median.load_state_dict(state_dict_median)
model_median.eval()
model_high.load_state_dict(state_dict_high)
model_high.eval()
def generate_file(file_obj):
global tmpdir
shutil.copy(file_obj.name, tmpdir)
FileName=os.path.basename(file_obj.name)
NewfilePath=os.path.join(tmpdir,FileName)
return NewfilePath
def MS2Embedding(spectra):
spec_mzs = [spec.mz for spec in [spectra]]
spec_intens = [spec.intensities for spec in [spectra]]
num_peaks = [len(i) for i in spec_mzs]
spec_mzs = [np.around(spec_mz, decimals=4) for spec_mz in spec_mzs]
if len(spec_mzs[0]) > 300:
spec_mzs = [spec_mzs[0][-300:]]
spec_intens = [spec_intens[0][-300:]]
num_peaks=[300]
else:
spec_mzs = [np.pad(spec_mz, (0, 300 - len(spec_mz)), mode='constant', constant_values=0) for spec_mz in spec_mzs]
spec_intens = [np.pad(spec_inten, (0, 300 - len(spec_inten)), mode='constant', constant_values=0) for spec_inten in spec_intens]
spec_mzs= torch.tensor(spec_mzs).float()
spec_intens= torch.tensor(spec_intens).float()
num_peaks = torch.LongTensor(num_peaks)
spec_tensor_low,spec_mask_low = model_low.ms_encoder(spec_mzs,spec_intens,num_peaks)
spec_tensor_low=model_low.spec_esa(spec_tensor_low,spec_mask_low)
spec_tensor_low = model_low.spec_proj(spec_tensor_low)
spec_tensor_low = spec_tensor_low/spec_tensor_low.norm(dim=-1, keepdim=True)
spec_tensor_median,spec_mask_median = model_median.ms_encoder(spec_mzs,spec_intens,num_peaks)
spec_tensor_median=model_median.spec_esa(spec_tensor_median,spec_mask_median)
spec_tensor_median = model_median.spec_proj(spec_tensor_median)
spec_tensor_median = spec_tensor_median/spec_tensor_median.norm(dim=-1, keepdim=True)
spec_tensor_high,spec_mask_high = model_high.ms_encoder(spec_mzs,spec_intens,num_peaks)
spec_tensor_high=model_high.spec_esa(spec_tensor_high,spec_mask_high)
spec_tensor_high = model_high.spec_proj(spec_tensor_high)
spec_tensor_high = spec_tensor_high/spec_tensor_high.norm(dim=-1, keepdim=True)
return np.array(spec_tensor_low.tolist()[0]),np.array(spec_tensor_median.tolist()[0]),np.array(spec_tensor_high.tolist()[0])
def calculate_cosine_similarity(vector1, vector2):
return cosine_similarity(vector1.reshape(1, -1), vector2.reshape(1, -1))[0][0]
def retrieve_similarity_scores( conn_obj, table_name, target_mass,collision_energy, ms2_embedding_low, ms2_embedding_median, ms2_embedding_high):
cur = conn_obj.cursor()
if table_name == "CSU_MS2_DB":
table_name = 'ConSSDB'
if table_name == "BloodExp: blood exposome database":
table_name = 'BloodexpDB'
if table_name == "ChEBI: products of nature or synthetic products database":
table_name = 'ChebiDB'
if table_name == "ChemFOnt: Biochemical database including primary metabolites, secondary metabolites, natural products, etc":
table_name = 'ChemfontDB'
if table_name == "ContaminantDB: Contaminant data from different online references and databases on contaminants":
table_name = 'ContaminantdbDB'
if table_name == "DrugBank: drug biochemical and pharmacological information database":
table_name = 'DrugbankDB'
if table_name == "ECMDB: database of small molecule metabolites found in or produced by Escherichia coli":
table_name = 'EcmdbDB'
if table_name == "Exposome-Explorer: biomarkers of exposure to environmental risk factors for diseases":
table_name = 'ExposomeDB'
if table_name == "Foodb: food constituent database":
table_name = 'FoodbDB'
if table_name == "HMDB: human metabolome database":
table_name = 'HmdbDB'
if table_name == "KEGG: a collection of small molecules, biopolymers, and other chemical substances":
table_name = 'KeggDB'
if table_name == "KNApSAcK: integrated metabolite-plant species database":
table_name = 'KnapsackDB'
if table_name == "MCDB: small molecule metabolites found in cow milk":
table_name = 'MilkDB'
if table_name == "MiMeDB: taxonomic, microbiological, and body-site location data on most known human microbes":
table_name = 'MimedbDB'
if table_name == "NANPDB: database of natural products isolated from native organisms of Northern Africa":
table_name = 'NanpdbDB'
if table_name == "NPAtlas: natural products atlas database":
table_name = 'NpatlasDB'
if table_name == "Phenol-Explorer: Polyphenols":
table_name = 'PhenolDB'
if table_name == "PMHub: plant metabolite database":
table_name = 'PmhubDB'
if table_name == "PMN: plant metabolite database":
table_name = 'PmnDB'
if table_name == "SMPDB: small molecule pathway database":
table_name = 'SmpdbDB'
if table_name == "STOFF-IDENT: database of water relevant substances":
table_name = 'StoffDB'
if table_name == "T3DB: toxic exposome database":
table_name = 'T3dbDB'
if table_name == "TCMSP: traditional chinese medicine systems pharmacology database":
table_name = 'TcmspDB'
if table_name == "YMDB: yeast metabolome database":
table_name = 'YmdbDB'
target_mass = target_mass-1.008
tolerance = target_mass * 20 / 1000000
query = f"""
SELECT SMILES
FROM {table_name}
WHERE MonoisotopicMass >= ? - ? AND MonoisotopicMass <= ? + ?
"""
cur.execute(query, (target_mass, tolerance, target_mass, tolerance))
filtered_smiles = cur.fetchall()
similarity_scores = []
for smile in filtered_smiles:
query = f"""
SELECT low_energy_embedding, median_energy_embedding, high_energy_embedding
FROM {table_name}
WHERE SMILES = ?
"""
cur.execute(query, (smile[0],))
row = cur.fetchone()
if row is None:
return None
low_energy_embedding_db = np.array(pickle.loads(row[0]), dtype=np.float64)
median_energy_embedding_db = np.array(pickle.loads(row[1]), dtype=np.float64)
high_energy_embedding_db = np.array(pickle.loads(row[2]), dtype=np.float64)
low_energy_embedding_db,median_energy_embedding_db,high_energy_embedding_db = torch.tensor(low_energy_embedding_db).float(),torch.tensor(median_energy_embedding_db).float(),torch.tensor(high_energy_embedding_db).float()
low_similarity =(ms2_embedding_low @ low_energy_embedding_db.t()).item()
median_similarity = (ms2_embedding_median @ median_energy_embedding_db.t()).item()
high_similarity = (ms2_embedding_high @ high_energy_embedding_db.t()).item()
'''
low_similarity = calculate_cosine_similarity(ms2_embedding_low, low_energy_embedding_db)
median_similarity = calculate_cosine_similarity(ms2_embedding_median, median_energy_embedding_db)
high_similarity = calculate_cosine_similarity(ms2_embedding_high, high_energy_embedding_db)'''
similarity_scores.append((smile, low_similarity, median_similarity, high_similarity))
weighted_similarity_scores = []
for smile, low_similarity, median_similarity, high_similarity in similarity_scores:
if collision_energy <=15:
weighted_similarity = 0.4 * low_similarity + 0.3 * median_similarity + 0.3 * high_similarity
weighted_similarity_scores.append((smile, weighted_similarity))
elif collision_energy >15 and collision_energy <= 25:
weighted_similarity = 0.3 * low_similarity + 0.4 * median_similarity + 0.3 * high_similarity
weighted_similarity_scores.append((smile, weighted_similarity))
elif collision_energy > 25:
weighted_similarity = 0.2 * low_similarity + 0.3 * median_similarity + 0.5 * high_similarity
weighted_similarity_scores.append((smile, weighted_similarity))
weighted_similarity_scores.sort(key=lambda x: x[1], reverse=True)
top_10_smiles = weighted_similarity_scores[:10]
cur.close()
#conn.close()
return top_10_smiles
def get_topK_result(library,ms_feature, smiles_feature, topK):
if topK >= len(library):
topK = len(library)
with torch.no_grad():
ms_smiles_distances_tmp = (
ms_feature.unsqueeze(0) @ smiles_feature.t()).cpu()
scores_, indices_ = ms_smiles_distances_tmp.topk(topK,
dim=1,
largest=True,
sorted=True)
candidates=[library[i] for i in indices_.tolist()[0]]
indices=indices_.tolist()[0]
scores=scores_.tolist()[0]
return indices, scores, candidates
def rank_lib(conn_obj, database_name,spectrum_path,instrument_type,adduct,parent_Mass,collision_energy):
ms2 = list(load_from_msp(spectrum_path.name))[0]
ms2 = spectrum_processing(ms2)
collision_energy=float(collision_energy)
parent_Mass=float(parent_Mass)
ms2_embedding_low,ms2_embedding_median,ms2_embedding_high = MS2Embedding(ms2)
ms2_embedding_low,ms2_embedding_median,ms2_embedding_high = torch.tensor(ms2_embedding_low).float(),torch.tensor(ms2_embedding_median).float(),torch.tensor(ms2_embedding_high).float()
top_10_smiles = retrieve_similarity_scores(conn_obj, database_name,parent_Mass,collision_energy,ms2_embedding_low,ms2_embedding_median,ms2_embedding_high)
smis = [x[0][0] for x in top_10_smiles]
scores = [x[1] for x in top_10_smiles]
images,image_descrips=[],[]
bw_draw_options = Draw.MolDrawOptions()
bw_draw_options.useBWAtomPalette()
for smi in smis:
try:
mol = Chem.MolFromSmiles(smi)
images.append(Draw.MolToImage(mol, options=bw_draw_options))
except:
images.append('NAN')
for i in range(len(smis)):
image_descrips.append((images[i],'SMILES: '+smis[i]+' ' + 'Score: '+str(scores[i])))
#top_10_results = pd.DataFrame({'SMILES':[x[0] for x in top_10_smiles],'Struture':images,'Score':[x[1] for x in top_10_smiles],'Rank':list(range(10))})
return image_descrips
def rank_user_lib(candidate_file,spectrum_path,instrument_type,adduct,parent_Mass,collision_energy):
model_inference_low = ModelInference(config_path=config_path,
pretrain_model_path=pretrain_model_path_low,
device="cpu")
model_inference_median = ModelInference(config_path=config_path,
pretrain_model_path=pretrain_model_path_median,
device="cpu")
model_inference_high = ModelInference(config_path=config_path,
pretrain_model_path=pretrain_model_path_high,
device="cpu")
collision_energy=float(collision_energy)
users_candidate = pd.read_csv(candidate_file.name)
user_candidate_smiles = list(users_candidate['SMILES'])
candidate = pd.DataFrame(columns=['SMILES','MonoisotopicMass'])
for smi in user_candidate_smiles:
mol = Chem.MolFromSmiles(smi)
MonoisotopicMass = ExactMolWt(mol)
candidate.loc[len(candidate.index)] = [smi,MonoisotopicMass]
parent_Mass=float(parent_Mass)
query_ms=parent_Mass-1.008
sub_reference_library = search_structure_from_mass(candidate,query_ms,10)
if len(sub_reference_library) == 0:
sub_reference_library = search_structure_from_mass(candidate,query_ms,30)
if len(sub_reference_library) == 0:
sub_reference_library = candidate.copy()
candidate_smiles = list(sub_reference_library['SMILES'])
ms2 = list(load_from_msp(spectrum_path.name))[0]
ms2 = spectrum_processing(ms2)
ms2_embedding_low,ms2_embedding_median,ms2_embedding_high = MS2Embedding(ms2)
ms2_embedding_low,ms2_embedding_median,ms2_embedding_high = torch.tensor(ms2_embedding_low).float(),torch.tensor(ms2_embedding_median).float(),torch.tensor(ms2_embedding_high).float()
contexts = []
for i in range(0, len(candidate_smiles),64):
contexts.append(candidate_smiles[i:i + 64])
result_low = [model_inference_low.smiles_encode(i).cpu() for i in contexts]
result_low = torch.cat(result_low, 0)
result_median = [model_inference_median.smiles_encode(i).cpu() for i in contexts]
result_median = torch.cat(result_median, 0)
result_high = [model_inference_high.smiles_encode(i).cpu() for i in contexts]
result_high = torch.cat(result_high, 0)
low_similarity = ms2_embedding_low @ result_low.t()
median_similarity = ms2_embedding_median @ result_median.t()
high_similarity = ms2_embedding_high @ result_high.t()
low_similarity = low_similarity.numpy()
median_similarity = median_similarity.numpy()
high_similarity = high_similarity.numpy()
if collision_energy <=15:
weighted_similarity = 0.4 * low_similarity + 0.3 * median_similarity + 0.3 * high_similarity
elif collision_energy >15 and collision_energy <= 25:
weighted_similarity = 0.3 * low_similarity + 0.4 * median_similarity + 0.3 * high_similarity
elif collision_energy > 25:
weighted_similarity = 0.2 * low_similarity + 0.3 * median_similarity + 0.5 * high_similarity
weighted_similarity_scores=[(candidate_smiles[i],weighted_similarity[i]) for i in range(len(candidate_smiles))]
weighted_similarity_scores.sort(key=lambda x: x[1], reverse=True)
top_10_smiles = weighted_similarity_scores[:10]
smis = [x[0] for x in top_10_smiles]
scores = [x[1] for x in top_10_smiles]
images,image_descrips=[],[]
bw_draw_options = Draw.MolDrawOptions()
bw_draw_options.useBWAtomPalette()
for smi in smis:
try:
mol = Chem.MolFromSmiles(smi)
images.append(Draw.MolToImage(mol, options=bw_draw_options))
except:
images.append('NAN')
for i in range(len(smis)):
image_descrips.append((images[i],'SMILES: '+smis[i]+' ' + 'Score: '+str(scores[i])))
return image_descrips
with gr.Blocks(theme=seafoam) as demo:
gr.HTML(custom_css)
gr.Markdown('<div style="font-size:50px; font-weight:bold;">🔍 CSU-MS2 web server </div>')
db_conn_state = gr.State(None)
with gr.Row():
with gr.Column():
peak_data = gr.File(file_count="single", label="Upload MS/MS spectrum file in .msp format", elem_classes=".file-upload-height")
spectrum_output = gr.Plot(label="Mass Spectrum",elem_id="custom_plot")
peak_data.change(fn=draw_mass_spectrum, inputs=[peak_data], outputs=[spectrum_output])
with gr.Row():
instru=gr.Dropdown(["HCD"], label="Instrument Type")
ionmode=gr.Dropdown(["[M+H]+"], label="Adduct Type")
par_ion_mass=gr.Textbox(label="Parent Ion Mass",placeholder="e.g., 180.00")
collision_e=gr.Textbox(label="collision energy", placeholder="e.g., 40")
gr.Examples(
examples=[
["example_spectrum_searched_csu-ms2-db.msp", "HCD", "[M+H]+", "336.1735", "40"]
],
inputs=[peak_data, instru, ionmode, par_ion_mass, collision_e],
outputs=[spectrum_output],
label="Upload Example Spectrum"
)
with gr.Tab(label="📶 Struture library", elem_id='custom_tab'):
with gr.Column():
dataset = gr.Dropdown(["CSU_MS2_DB",
"BloodExp: blood exposome database",
"ChEBI: products of nature or synthetic products database",
"ChemFOnt: Biochemical database including primary metabolites, secondary metabolites, natural products, etc",
"ContaminantDB: Contaminant data from different online references and databases on contaminants",
"DrugBank: drug biochemical and pharmacological information database",
"ECMDB: database of small molecule metabolites found in or produced by Escherichia coli",
"Exposome-Explorer: biomarkers of exposure to environmental risk factors for diseases",
"Foodb: food constituent database",
"HMDB: human metabolome database",
"KEGG: a collection of small molecules, biopolymers, and other chemical substances",
"KNApSAcK: integrated metabolite-plant species database",
"MCDB: small molecule metabolites found in cow milk",
"MiMeDB: taxonomic, microbiological, and body-site location data on most known human microbes",
"NANPDB: database of natural products isolated from native organisms of Northern Africa",
"NPAtlas: natural products atlas database",
"Phenol-Explorer: Polyphenols",
"PMHub: plant metabolite database",
"PMN: plant metabolite database",
"SMPDB: small molecule pathway database",
"STOFF-IDENT: database of water relevant substances",
"T3DB: toxic exposome database",
"TCMSP: traditional chinese medicine systems pharmacology database",
"YMDB: yeast metabolome database"], label="Choose a structure library")
with gr.Column():
lib_button = gr.Button("Cross-Modal Retrieval")
lib_output = gr.Gallery(height='auto',columns=4,elem_classes="gallery-height",label='Cross-modal retrieval results')
with gr.Tab("📁 Upload structure file"):
with gr.Row():
with gr.Column():
user_dataset= gr.File(file_count="single", label="Upload the candidate structure file in csv format, columns=['SMIELS']",elem_classes="file-upload-height2")
gr.Examples(
examples=[
["user-defined structure file for spectrum1.csv"]
],
inputs=[user_dataset],
label="Upload Example structural file"
)
with gr.Column():
user_button = gr.Button("Cross-Modal Retrieval")
user_output = gr.Gallery(height='auto',columns=4,elem_classes="gallery-height",label='Cross-modal retrieval results')
demo.load(fn=initialize_db, inputs=None, outputs=db_conn_state, queue=True, show_progress="full")
lib_button.click(rank_lib, inputs=[db_conn_state, dataset,peak_data,instru,ionmode,par_ion_mass,collision_e], outputs=lib_output)
user_button.click(rank_user_lib, inputs=[user_dataset,peak_data,instru,ionmode,par_ion_mass,collision_e], outputs=user_output)
demo.launch(share=True)