FLARE / flare /subformula_assign /utils /spectra_utils.py
yzhouchen001's picture
cleaned up
2c0063e
""" spectra_utils.py"""
import logging
import numpy as np
from typing import List
from .chem_utils import (
vec_to_formula,
get_all_subsets,
ion_to_mass,
ION_LST,
clipped_ppm,
)
def bin_spectra(
spectras: List[np.ndarray], num_bins: int = 2000, upper_limit: int = 1000
) -> np.ndarray:
"""bin_spectra.
Args:
spectras (List[np.ndarray]): Input list of spectra tuples
[(header, spec array)]
num_bins (int): Number of discrete bins from [0, upper_limit)
upper_limit (int): Max m/z to consider featurizing
Return:
np.ndarray of shape [channels, num_bins]
"""
bins = np.linspace(0, upper_limit, num=num_bins)
binned_spec = np.zeros((len(spectras), len(bins)))
for spec_index, spec in enumerate(spectras):
# Convert to digitized spectra
digitized_mz = np.digitize(spec[:, 0], bins=bins)
# Remove all spectral peaks out of range
in_range = digitized_mz < len(bins)
digitized_mz, spec = digitized_mz[in_range], spec[in_range, :]
# Add the current peaks to the spectra
# Use a loop rather than vectorize because certain bins have conflicts
# based upon resolution
for bin_index, spec_val in zip(digitized_mz, spec[:, 1]):
binned_spec[spec_index, bin_index] += spec_val
return binned_spec
def merge_norm_spectra(spec_tuples, precision=4) -> np.ndarray:
"""merge_norm_spectra.
Take a list of mz, inten tuple arrays and merge them by 4 digit precision
Note this uses _max_ merging
"""
mz_to_inten_pair = {}
for i in spec_tuples:
for tup in i:
mz, inten = tup
mz_ind = np.round(mz, precision)
cur_pair = mz_to_inten_pair.get(mz_ind)
if cur_pair is None:
mz_to_inten_pair[mz_ind] = tup
elif inten > cur_pair[1]:
mz_to_inten_pair[mz_ind] = (mz_ind, inten)
else:
pass
merged_spec = np.vstack([v for k, v in mz_to_inten_pair.items()])
merged_spec[:, 1] = merged_spec[:, 1] / merged_spec[:, 1].max()
return merged_spec
def norm_spectrum(binned_spec: np.ndarray) -> np.ndarray:
"""norm_spectrum.
Normalizes each spectral channel to have norm 1
This change is made in place
Args:
binned_spec (np.ndarray) : Vector of spectras
Return:
np.ndarray where each channel has max(1)
"""
spec_maxes = binned_spec.max(1)
non_zero_max = spec_maxes > 0
spec_maxes = spec_maxes[non_zero_max]
binned_spec[non_zero_max] = binned_spec[non_zero_max] / spec_maxes.reshape(-1, 1)
return binned_spec
def process_spec_file(meta, tuples, precision=4, max_inten=0.001, max_peaks=60):
"""process_spec_file."""
if "parentmass" in meta:
parentmass = meta.get("parentmass", None)
elif "PARENTMASS" in meta:
parentmass = meta.get("PARENTMASS", None)
elif "PEPMASS" in meta:
parentmass = meta.get("PEPMASS", None)
else:
logging.debug(f"missing parentmass for spec")
parentmass = 1000000
parentmass = float(parentmass)
# First norm spectra
fused_tuples = [x for _, x in tuples if x.size > 0]
if len(fused_tuples) == 0:
return
mz_to_inten_pair = {}
new_tuples = []
for i in fused_tuples:
for tup in i:
mz, inten = tup
mz_ind = np.round(mz, precision)
cur_pair = mz_to_inten_pair.get(mz_ind)
if cur_pair is None:
mz_to_inten_pair[mz_ind] = tup
new_tuples.append(tup)
elif inten > cur_pair[1]:
cur_pair[1] = inten
else:
pass
merged_spec = np.vstack(new_tuples)
merged_spec = merged_spec[merged_spec[:, 0] <= (parentmass + 1)] # could end up removing all peaks?
try:
merged_spec[:, 1] = merged_spec[:, 1] / merged_spec[:, 1].max()
except:
return
# Sqrt intensities here
merged_spec[:, 1] = np.sqrt(merged_spec[:, 1])
merged_spec = max_inten_spec(
merged_spec, max_num_inten=max_peaks, inten_thresh=max_inten
)
return merged_spec
def max_inten_spec(spec, max_num_inten: int = 60, inten_thresh: float = 0):
"""max_inten_spec.
Args:
spec: 2D spectra array
max_num_inten: Max number of peaks
inten_thresh: Min intensity to alloow in returned peak
Return:
Spec filtered down
"""
spec_masses, spec_intens = spec[:, 0], spec[:, 1]
# Make sure to only take max of each formula
# Sort by intensity and select top subpeaks
new_sort_order = np.argsort(spec_intens)[::-1]
if max_num_inten is not None:
new_sort_order = new_sort_order[:max_num_inten]
spec_masses = spec_masses[new_sort_order]
spec_intens = spec_intens[new_sort_order]
spec_mask = spec_intens > inten_thresh
spec_masses = spec_masses[spec_mask]
spec_intens = spec_intens[spec_mask]
spec = np.vstack([spec_masses, spec_intens]).transpose(1, 0)
return spec
def max_thresh_spec(spec: np.ndarray, max_peaks=100, inten_thresh=0.003):
"""max_thresh_spec.
Args:
spec (np.ndarray): spec
max_peaks: Max num peaks to keep
inten_thresh: Min inten to keep
"""
spec_masses, spec_intens = spec[:, 0], spec[:, 1]
# Make sure to only take max of each formula
# Sort by intensity and select top subpeaks
new_sort_order = np.argsort(spec_intens)[::-1]
new_sort_order = new_sort_order[:max_peaks]
spec_masses = spec_masses[new_sort_order]
spec_intens = spec_intens[new_sort_order]
spec_mask = spec_intens > inten_thresh
spec_masses = spec_masses[spec_mask]
spec_intens = spec_intens[spec_mask]
out_ar = np.vstack([spec_masses, spec_intens]).transpose(1, 0)
return out_ar
def assign_subforms(form, spec, ion_type, mass_diff_thresh=15):
"""_summary_
Args:
form (_type_): _description_
spec (_type_): _description_
ion_type (_type_): _description_
mass_diff_thresh (int, optional): _description_. Defaults to 15.
Returns:
_type_: _description_
"""
# try:
cross_prod, masses = get_all_subsets(form)
spec_masses, spec_intens = spec[:, 0], spec[:, 1]
ion_masses = ion_to_mass[ion_type]
masses_with_ion = masses + ion_masses
ion_types = np.array([ion_type] * len(masses_with_ion))
mass_diffs = np.abs(spec_masses[:, None] - masses_with_ion[None, :])
formula_inds = mass_diffs.argmin(-1)
min_mass_diff = mass_diffs[np.arange(len(mass_diffs)), formula_inds]
rel_mass_diff = clipped_ppm(min_mass_diff, spec_masses)
# Filter by mass diff threshold (ppm)
valid_mask = rel_mass_diff < mass_diff_thresh
spec_masses = spec_masses[valid_mask]
spec_intens = spec_intens[valid_mask]
min_mass_diff = min_mass_diff[valid_mask]
rel_mass_diff = rel_mass_diff[valid_mask]
formula_inds = formula_inds[valid_mask]
formulas = np.array([vec_to_formula(j) for j in cross_prod[formula_inds]])
formula_masses = masses_with_ion[formula_inds]
ion_types = ion_types[formula_inds]
# Build mask for uniqueness on formula and ionization
# note that ionization are all the same for one subformula assignment
# hence we only need to consider the uniqueness of the formula
formula_idx_dict = {}
uniq_mask = []
for idx, formula in enumerate(formulas):
uniq_mask.append(formula not in formula_idx_dict)
gather_ind = formula_idx_dict.get(formula, None)
if gather_ind is None:
continue
spec_intens[gather_ind] += spec_intens[idx]
formula_idx_dict[formula] = idx
spec_masses = spec_masses[uniq_mask]
spec_intens = spec_intens[uniq_mask]
min_mass_diff = min_mass_diff[uniq_mask]
rel_mass_diff = rel_mass_diff[uniq_mask]
formula_masses = formula_masses[uniq_mask]
formulas = formulas[uniq_mask]
ion_types = ion_types[uniq_mask]
# To calculate explained intensity, preserve the original normalized
# intensity
if spec_intens.size == 0:
output_tbl = None
else:
output_tbl = {
"mz": list(spec_masses),
"ms2_inten": list(spec_intens),
"mono_mass": list(formula_masses),
"abs_mass_diff": list(min_mass_diff),
"mass_diff": list(rel_mass_diff),
"formula": list(formulas),
"ions": list(ion_types),
}
# except Exception as e:
# print(e)
# output_tbl = None
# print(f"failed to process formula {form}")
# pass
output_dict = {
"cand_form": form,
"cand_ion": ion_type,
"output_tbl": output_tbl,
}
return output_dict
def get_output_dict(
spec_name: str,
spec: np.ndarray,
form: str,
mass_diff_type: str,
mass_diff_thresh: float,
ion_type: str,
) -> dict:
"""_summary_
This function attemps to take an array of mass intensity values and assign
formula subsets to subpeaks
Args:
spec_name (str): _description_
spec (np.ndarray): _description_
form (str): _description_
mass_diff_type (str): _description_
mass_diff_thresh (float): _description_
ion_type (str): _description_
Returns:
dict: _description_
"""
assert mass_diff_type == "ppm"
# This is the case for some erroneous MS2 files for which proc_spec_file return None
# All the MS2 subpeaks in these erroneous MS2 files has mz larger than parentmass
output_dict = {"cand_form": form, "cand_ion": ion_type, "output_tbl": None}
if spec is not None and ion_type in ION_LST:
output_dict = assign_subforms(
form, spec, ion_type, mass_diff_thresh=mass_diff_thresh
)
return output_dict