yzhouchen001's picture
cleaned up
2c0063e
""" parse_utils.py """
from pathlib import Path
from typing import Tuple, List, Optional
from itertools import groupby
from tqdm import tqdm
import numpy as np
import pandas as pd
def parse_spectra(spectra_file: str) -> Tuple[dict, List[Tuple[str, np.ndarray]]]:
"""parse_spectra.
Parses spectra in the SIRIUS format and returns
Args:
spectra_file (str): Name of spectra file to parse
Return:
Tuple[dict, List[Tuple[str, np.ndarray]]]: metadata and list of spectra
tuples containing name and array
"""
lines = [i.strip() for i in open(spectra_file, "r").readlines()]
group_num = 0
metadata = {}
spectras = []
my_iterator = groupby(
lines, lambda line: line.startswith(">") or line.startswith("#")
)
for index, (start_line, lines) in enumerate(my_iterator):
group_lines = list(lines)
subject_lines = list(next(my_iterator)[1])
# Get spectra
if group_num > 0:
spectra_header = group_lines[0].split(">")[1]
peak_data = [
[float(x) for x in peak.split()[:2]]
for peak in subject_lines
if peak.strip()
]
# Check if spectra is empty
if len(peak_data):
peak_data = np.vstack(peak_data)
# Add new tuple
spectras.append((spectra_header, peak_data))
# Get meta data
else:
entries = {}
for i in group_lines:
if " " not in i:
continue
elif i.startswith("#INSTRUMENT TYPE"):
key = "#INSTRUMENT TYPE"
val = i.split(key)[1].strip()
entries[key[1:]] = val
else:
start, end = i.split(" ", 1)
start = start[1:]
while start in entries:
start = f"{start}'"
entries[start] = end
metadata.update(entries)
group_num += 1
metadata["_FILE_PATH"] = spectra_file
metadata["_FILE"] = Path(spectra_file).stem
return metadata, spectras
def spec_to_ms_str(
spec: List[Tuple[str, np.ndarray]], essential_keys: dict, comments: dict = {}
) -> str:
"""spec_to_ms_str.
Turn spec ars and info dicts into str for output file
Args:
spec (List[Tuple[str, np.ndarray]]): spec
essential_keys (dict): essential_keys
comments (dict): comments
Returns:
str:
"""
def pair_rows(rows):
return "\n".join([f"{i} {j}" for i, j in rows])
header = "\n".join(f">{k} {v}" for k, v in essential_keys.items())
comments = "\n".join(f"#{k} {v}" for k, v in essential_keys.items())
spec_strs = [f">{name}\n{pair_rows(ar)}" for name, ar in spec]
spec_str = "\n\n".join(spec_strs)
output = f"{header}\n{comments}\n\n{spec_str}"
return output
def build_mgf_str(
meta_spec_list: List[Tuple[dict, List[Tuple[str, np.ndarray]]]],
merge_charges=True,
parent_mass_keys=["PEPMASS", "parentmass", "PRECURSOR_MZ"],
) -> str:
"""build_mgf_str.
Args:
meta_spec_list (List[Tuple[dict, List[Tuple[str, np.ndarray]]]]): meta_spec_list
Returns:
str:
"""
entries = []
for meta, spec in tqdm(meta_spec_list):
str_rows = ["BEGIN IONS"]
# Try to add precusor mass
for i in parent_mass_keys:
if i in meta:
pep_mass = float(meta.get(i, -100))
str_rows.append(f"PEPMASS={pep_mass}")
break
for k, v in meta.items():
str_rows.append(f"{k.upper().replace(' ', '_')}={v}")
if merge_charges:
spec_ar = np.vstack([i[1] for i in spec])
spec_ar = np.vstack([i for i in sorted(spec_ar, key=lambda x: x[0])])
else:
raise NotImplementedError()
str_rows.extend([f"{i} {j}" for i, j in spec_ar])
str_rows.append("END IONS")
str_out = "\n".join(str_rows)
entries.append(str_out)
full_out = "\n\n".join(entries)
return full_out
def parse_spectra_msp(
mgf_file: str, max_num: Optional[int] = None
) -> List[Tuple[dict, List[Tuple[str, np.ndarray]]]]:
"""parse_spectr_msp.
Parses spectra in the MSP file format
Args:
mgf_file (str) : str
max_num (Optional[int]): If set, only parse this many
Return:
List[Tuple[dict, List[Tuple[str, np.ndarray]]]]: metadata and list of spectra
tuples containing name and array
"""
key = lambda x: x.strip().startswith("PEPMASS")
parsed_spectra = []
with open(mgf_file, "r", encoding="utf-8") as fp:
for (is_header, group) in tqdm(groupby(fp, key)):
if is_header:
continue
meta = dict()
spectra = []
# Note: Sometimes we have multiple scans
# This mgf has them collapsed
cur_spectra_name = "spec"
cur_spectra = []
group = list(group)
for line in group:
line = line.strip()
if not line:
pass
elif ":" in line:
k, v = [i.strip() for i in line.split(":", 1)]
meta[k] = v
else:
mz, intens = line.split()
cur_spectra.append((float(mz), float(intens)))
if len(cur_spectra) > 0:
cur_spectra = np.vstack(cur_spectra)
spectra.append((cur_spectra_name, cur_spectra))
parsed_spectra.append((meta, spectra))
else:
pass
# print("no spectra found for group: ", "".join(group))
if max_num is not None and len(parsed_spectra) > max_num:
# print("Breaking")
break
return parsed_spectra
def parse_spectra_mgf(
mgf_file: str, max_num: Optional[int] = None
) -> List[Tuple[dict, List[Tuple[str, np.ndarray]]]]:
"""parse_spectr_mgf.
Parses spectra in the MGF file formate, with
Args:
mgf_file (str) : str
max_num (Optional[int]): If set, only parse this many
Return:
List[Tuple[dict, List[Tuple[str, np.ndarray]]]]: metadata and list of spectra
tuples containing name and array
"""
key = lambda x: x.strip() == "BEGIN IONS"
parsed_spectra = []
with open(mgf_file, "r") as fp:
for (is_header, group) in tqdm(groupby(fp, key)):
if is_header:
continue
meta = dict()
spectra = []
# Note: Sometimes we have multiple scans
# This mgf has them collapsed
cur_spectra_name = "spec"
cur_spectra = []
group = list(group)
for line in group:
line = line.strip()
if not line:
pass
elif line == "END IONS" or line == "BEGIN IONS":
pass
elif "=" in line:
k, v = [i.strip() for i in line.split("=", 1)]
meta[k] = v
else:
mz, intens = line.split()
cur_spectra.append((float(mz), float(intens)))
if len(cur_spectra) > 0:
cur_spectra = np.vstack(cur_spectra)
spectra.append((cur_spectra_name, cur_spectra))
parsed_spectra.append((meta, spectra))
else:
pass
# print("no spectra found for group: ", "".join(group))
if max_num is not None and len(parsed_spectra) > max_num:
# print("Breaking")
break
return parsed_spectra
def parse_tsv_spectra(spectra_file: str) -> List[Tuple[str, np.ndarray]]:
"""parse_tsv_spectra.
Parses spectra returned from sirius fragmentation tree
Args:
spectra_file (str): Name of spectra tsv file to parse
Return:
List[Tuple[str, np.ndarray]]]: list of spectra
tuples containing name and array. This is used to maintain
consistency with the parse_spectra output
"""
output_spec = []
with open(spectra_file, "r") as fp:
for index, line in enumerate(fp):
if index == 0:
continue
line = line.strip().split("\t")
intensity = float(line[1])
exact_mass = float(line[3])
output_spec.append([exact_mass, intensity])
output_spec = np.array(output_spec)
return_obj = [("sirius_spec", output_spec)]
return return_obj
# YZC parse msgym-like formatted data
def parse_spectra_msgym(df):
parsed_spectra = []
for _, row in df.iterrows():
mzs = [float(m) for m in row['mzs'].split(',')]
intensities = [float(i) for i in row['intensities'].split(',')]
cur_spectra = [(m, i) for m, i in zip(mzs, intensities)]
cur_spectra = np.vstack(cur_spectra)
cur_spectra_name = row['spec']
meta = {'ID': cur_spectra_name,
'parentmass': row['parent_mass']}
parsed_spectra.append((meta, [(cur_spectra_name, cur_spectra)]))
return parsed_spectra