Spaces:
Sleeping
Sleeping
| from os import listdir | |
| from os.path import isfile, join | |
| from pathlib import Path | |
| import shutil | |
| import re | |
| import math | |
| import numpy as np | |
| from src.snapconfig import config | |
| def create_out_dir(dir_path, exist_ok=True): | |
| out_path = Path(dir_path) | |
| if out_path.exists() and out_path.is_dir(): | |
| if not exist_ok: | |
| shutil.rmtree(out_path) | |
| out_path.mkdir() | |
| else: | |
| out_path.mkdir() | |
| Path(join(out_path, 'spectra')).mkdir() | |
| Path(join(out_path, 'peptides')).mkdir() | |
| def verify_in_dir(dir_path, ext, ignore_list=[]): | |
| in_path = Path(dir_path) | |
| assert in_path.exists() and in_path.is_dir() | |
| files = [join(dir_path, f) for f in listdir(dir_path) if | |
| isfile(join(dir_path, f)) and not f.startswith('.') | |
| and f.split('.')[-1] == ext and f not in ignore_list] | |
| assert len(files) > 0 | |
| return files | |
| def isfloat(str_float): | |
| try: | |
| float(str_float) | |
| return True | |
| except ValueError: | |
| return False | |
| def mod_repl(match): | |
| lookup = str(round(float(match.group(0)), 2)) | |
| return config.ModCHAR[lookup] if lookup in config.ModCHAR else "" | |
| def mod_repl_2(match): | |
| return '[' + str(round(float(match.group(0)), 2)) + ']' | |
| def preprocess_mgfs(mgf_dir, out_dir): | |
| mgf_files = verify_in_dir(mgf_dir, "mgf") | |
| create_out_dir(out_dir, exist_ok=False) | |
| print('reading {} files'.format(len(mgf_files))) | |
| spec_size = config.get_config(section='input', key='spec_size') | |
| ch = np.zeros(20) | |
| modified = 0 | |
| unmodified = 0 | |
| unique_pep_set = set() | |
| summ = np.zeros(spec_size) | |
| sq_sum = np.zeros(spec_size) | |
| N = 0 | |
| tot_count = 0 | |
| max_moz = 0 | |
| for mgf_file in mgf_files: | |
| print('Reading: {}'.format(mgf_file)) | |
| f = open(mgf_file, "r") | |
| lines = f.readlines() | |
| f.close() | |
| count = lcount = 0 | |
| mass_ign = 0 | |
| pep_len_ign = 0 | |
| dup_ign = 0 | |
| print('len of file: ' + str(len(lines))) | |
| limit = 200000 | |
| spec = [] | |
| is_name = is_mw = is_charge = False | |
| prev = 0 | |
| i = 0 | |
| while i < len(lines) and limit > 0: | |
| line = lines[i] | |
| i += 1 | |
| if line.startswith('PEPMASS'): | |
| count += 1 | |
| mass = float(re.findall(r"PEPMASS=([-+]?[0-9]*\.?[0-9]*)", line)[0]) | |
| is_mw = True | |
| if is_mw and line.startswith('CHARGE'): | |
| l_charge = int(re.findall(r"CHARGE=([-+]?[0-9]*\.?[0-9]*)", line)[0]) | |
| is_charge = True | |
| mass = (mass - config.PROTON) * l_charge | |
| if is_mw and is_charge: | |
| while not isfloat(re.split(' |\t|=', lines[i])[0]): | |
| i += 1 | |
| spec_ind = [] | |
| spec_val = [] | |
| num_peaks = 0 | |
| while 'END IONS' not in lines[i].upper(): | |
| if lines[i] == '\n': | |
| i += 1 | |
| continue | |
| mz_line = lines[i] | |
| i += 1 | |
| num_peaks += 1 | |
| mz_splits = re.split(' |\t', mz_line) | |
| moz = round(float(mz_splits[0]) * 10) # + 32 # 32 because charge is len 8 and mass is len 24 | |
| intensity = math.sqrt(float(mz_splits[1]) + 1.0) # adding 1 to avoid sqrt of zero | |
| # intensity = float(mz_splits[1]) | |
| if moz > max_moz: | |
| max_moz = moz | |
| if 0 < moz < spec_size: | |
| # spec[round(moz*10)] += round(intensity) | |
| if spec_ind and spec_ind[-1] == moz: | |
| spec_val[-1] = max(intensity, spec_val[-1]) | |
| else: | |
| spec_ind.append(moz) | |
| spec_val.append(intensity) # adding one to avoid sqrt of zero | |
| if num_peaks < 10: | |
| is_name = is_mw = is_charge = False | |
| continue | |
| spec_ind = np.array(spec_ind) | |
| spec_val = np.array(spec_val) | |
| spec_val = (spec_val - np.amin(spec_val)) / (np.amax(spec_val) - np.amin(spec_val)) | |
| ind = spec_ind | |
| val = spec_val | |
| assert len(ind) == len(val) | |
| spec = np.array([ind, val]) | |
| summ[ind] += val | |
| sq_sum[ind] += val**2 | |
| N += 1 | |
| is_name = True | |
| if is_name and is_mw and is_charge: | |
| is_name = is_mw = is_charge = False | |
| """output the data to """ | |
| spec_file_name = '{}-{}-{}.npy'.format(lcount, mass, l_charge) | |
| np.save(join(out_dir, 'spectra', spec_file_name), spec) | |
| lcount += 1 | |
| tot_count += 1 | |
| pep = 0 | |
| spec = [] | |
| new = int((i / len(lines)) * 100) | |
| if new >= prev + 10: | |
| #clear_output(wait=True) | |
| print('count: ' + str(lcount)) | |
| print(str(new) + '%') | |
| prev = new | |
| #print('max peaks: ' + str(max_peaks)) | |
| print('In current file, read {} out of {}'.format(lcount, count)) | |
| print("Ignored: large mass: {}, pep len: {}, dup: {}".format(mass_ign, pep_len_ign, dup_ign)) | |
| print('overall running count: ' + str(tot_count)) | |
| print('max moz: ' + str(max_moz)) | |
| print("Statistics:") | |
| print("Charge distribution:") | |
| print(ch) | |
| print("Modified:\t{}".format(modified)) | |
| print("Unmodified:\t{}".format(unmodified)) | |
| print("Unique Peptides:\t{}".format(len(unique_pep_set))) | |
| print("Sum: {}".format(summ)) | |
| print("Sum-Squared: {}".format(sq_sum)) | |
| print("N: {}".format(N)) | |
| means = summ / N | |
| print("mean: {}".format(means)) | |
| stds = np.sqrt((sq_sum / N) - means**2) | |
| stds[stds < 0.0000001] = float("inf") | |
| print("std: {}".format(stds)) | |
| np.save(join(out_dir, 'means.npy'), means) | |
| np.save(join(out_dir, 'stds.npy'), stds) | |
| # return spectra, masses, charges |