""" parse_utils.py """ from pathlib import Path from typing import Tuple, List, Optional from itertools import groupby from tqdm import tqdm import numpy as np import pandas as pd def parse_spectra(spectra_file: str) -> Tuple[dict, List[Tuple[str, np.ndarray]]]: """parse_spectra. Parses spectra in the SIRIUS format and returns Args: spectra_file (str): Name of spectra file to parse Return: Tuple[dict, List[Tuple[str, np.ndarray]]]: metadata and list of spectra tuples containing name and array """ lines = [i.strip() for i in open(spectra_file, "r").readlines()] group_num = 0 metadata = {} spectras = [] my_iterator = groupby( lines, lambda line: line.startswith(">") or line.startswith("#") ) for index, (start_line, lines) in enumerate(my_iterator): group_lines = list(lines) subject_lines = list(next(my_iterator)[1]) # Get spectra if group_num > 0: spectra_header = group_lines[0].split(">")[1] peak_data = [ [float(x) for x in peak.split()[:2]] for peak in subject_lines if peak.strip() ] # Check if spectra is empty if len(peak_data): peak_data = np.vstack(peak_data) # Add new tuple spectras.append((spectra_header, peak_data)) # Get meta data else: entries = {} for i in group_lines: if " " not in i: continue elif i.startswith("#INSTRUMENT TYPE"): key = "#INSTRUMENT TYPE" val = i.split(key)[1].strip() entries[key[1:]] = val else: start, end = i.split(" ", 1) start = start[1:] while start in entries: start = f"{start}'" entries[start] = end metadata.update(entries) group_num += 1 metadata["_FILE_PATH"] = spectra_file metadata["_FILE"] = Path(spectra_file).stem return metadata, spectras def spec_to_ms_str( spec: List[Tuple[str, np.ndarray]], essential_keys: dict, comments: dict = {} ) -> str: """spec_to_ms_str. Turn spec ars and info dicts into str for output file Args: spec (List[Tuple[str, np.ndarray]]): spec essential_keys (dict): essential_keys comments (dict): comments Returns: str: """ def pair_rows(rows): return "\n".join([f"{i} {j}" for i, j in rows]) header = "\n".join(f">{k} {v}" for k, v in essential_keys.items()) comments = "\n".join(f"#{k} {v}" for k, v in essential_keys.items()) spec_strs = [f">{name}\n{pair_rows(ar)}" for name, ar in spec] spec_str = "\n\n".join(spec_strs) output = f"{header}\n{comments}\n\n{spec_str}" return output def build_mgf_str( meta_spec_list: List[Tuple[dict, List[Tuple[str, np.ndarray]]]], merge_charges=True, parent_mass_keys=["PEPMASS", "parentmass", "PRECURSOR_MZ"], ) -> str: """build_mgf_str. Args: meta_spec_list (List[Tuple[dict, List[Tuple[str, np.ndarray]]]]): meta_spec_list Returns: str: """ entries = [] for meta, spec in tqdm(meta_spec_list): str_rows = ["BEGIN IONS"] # Try to add precusor mass for i in parent_mass_keys: if i in meta: pep_mass = float(meta.get(i, -100)) str_rows.append(f"PEPMASS={pep_mass}") break for k, v in meta.items(): str_rows.append(f"{k.upper().replace(' ', '_')}={v}") if merge_charges: spec_ar = np.vstack([i[1] for i in spec]) spec_ar = np.vstack([i for i in sorted(spec_ar, key=lambda x: x[0])]) else: raise NotImplementedError() str_rows.extend([f"{i} {j}" for i, j in spec_ar]) str_rows.append("END IONS") str_out = "\n".join(str_rows) entries.append(str_out) full_out = "\n\n".join(entries) return full_out def parse_spectra_msp( mgf_file: str, max_num: Optional[int] = None ) -> List[Tuple[dict, List[Tuple[str, np.ndarray]]]]: """parse_spectr_msp. Parses spectra in the MSP file format Args: mgf_file (str) : str max_num (Optional[int]): If set, only parse this many Return: List[Tuple[dict, List[Tuple[str, np.ndarray]]]]: metadata and list of spectra tuples containing name and array """ key = lambda x: x.strip().startswith("PEPMASS") parsed_spectra = [] with open(mgf_file, "r", encoding="utf-8") as fp: for (is_header, group) in tqdm(groupby(fp, key)): if is_header: continue meta = dict() spectra = [] # Note: Sometimes we have multiple scans # This mgf has them collapsed cur_spectra_name = "spec" cur_spectra = [] group = list(group) for line in group: line = line.strip() if not line: pass elif ":" in line: k, v = [i.strip() for i in line.split(":", 1)] meta[k] = v else: mz, intens = line.split() cur_spectra.append((float(mz), float(intens))) if len(cur_spectra) > 0: cur_spectra = np.vstack(cur_spectra) spectra.append((cur_spectra_name, cur_spectra)) parsed_spectra.append((meta, spectra)) else: pass # print("no spectra found for group: ", "".join(group)) if max_num is not None and len(parsed_spectra) > max_num: # print("Breaking") break return parsed_spectra def parse_spectra_mgf( mgf_file: str, max_num: Optional[int] = None ) -> List[Tuple[dict, List[Tuple[str, np.ndarray]]]]: """parse_spectr_mgf. Parses spectra in the MGF file formate, with Args: mgf_file (str) : str max_num (Optional[int]): If set, only parse this many Return: List[Tuple[dict, List[Tuple[str, np.ndarray]]]]: metadata and list of spectra tuples containing name and array """ key = lambda x: x.strip() == "BEGIN IONS" parsed_spectra = [] with open(mgf_file, "r") as fp: for (is_header, group) in tqdm(groupby(fp, key)): if is_header: continue meta = dict() spectra = [] # Note: Sometimes we have multiple scans # This mgf has them collapsed cur_spectra_name = "spec" cur_spectra = [] group = list(group) for line in group: line = line.strip() if not line: pass elif line == "END IONS" or line == "BEGIN IONS": pass elif "=" in line: k, v = [i.strip() for i in line.split("=", 1)] meta[k] = v else: mz, intens = line.split() cur_spectra.append((float(mz), float(intens))) if len(cur_spectra) > 0: cur_spectra = np.vstack(cur_spectra) spectra.append((cur_spectra_name, cur_spectra)) parsed_spectra.append((meta, spectra)) else: pass # print("no spectra found for group: ", "".join(group)) if max_num is not None and len(parsed_spectra) > max_num: # print("Breaking") break return parsed_spectra def parse_tsv_spectra(spectra_file: str) -> List[Tuple[str, np.ndarray]]: """parse_tsv_spectra. Parses spectra returned from sirius fragmentation tree Args: spectra_file (str): Name of spectra tsv file to parse Return: List[Tuple[str, np.ndarray]]]: list of spectra tuples containing name and array. This is used to maintain consistency with the parse_spectra output """ output_spec = [] with open(spectra_file, "r") as fp: for index, line in enumerate(fp): if index == 0: continue line = line.strip().split("\t") intensity = float(line[1]) exact_mass = float(line[3]) output_spec.append([exact_mass, intensity]) output_spec = np.array(output_spec) return_obj = [("sirius_spec", output_spec)] return return_obj # YZC parse msgym-like formatted data def parse_spectra_msgym(df): parsed_spectra = [] for _, row in df.iterrows(): mzs = [float(m) for m in row['mzs'].split(',')] intensities = [float(i) for i in row['intensities'].split(',')] cur_spectra = [(m, i) for m, i in zip(mzs, intensities)] cur_spectra = np.vstack(cur_spectra) cur_spectra_name = row['spec'] meta = {'ID': cur_spectra_name, 'parentmass': row['parent_mass']} parsed_spectra.append((meta, [(cur_spectra_name, cur_spectra)])) return parsed_spectra