import multiprocessing as mp import os import pickle import crick import loompy as lp import numpy as np from loguru import logger def get_filename() -> str: current_file_name = os.path.basename(__file__) log_name = "{}.log" return log_name.format(current_file_name.split('.')[0]) def set_log() -> None: filename = get_filename() logger.add(f'../log/{filename}') set_log() home_path = '/scDifformer/data/240412_test/looms' outdir = r'/scDifformer/data/240412_test/tdigest/' def nonzero_median_digests(file_path: str) -> None: try: output_file = file_path.replace(".loom", ".gene_median_digest_dict.pickle") file_name = output_file.split(os.sep)[-1] with lp.connect(file_path) as data: # define coordinates of protein-coding or miRNA genes coding_miRNA_loc = np.where((data.ra.gene_type == "protein_coding") | (data.ra.gene_type == "miRNA"))[0] coding_miRNA_genes = data.ra["ensembl_id"][coding_miRNA_loc] # initiate tdigests median_digests = [crick.tdigest.TDigest() for _ in range(len(coding_miRNA_loc))] # initiate progress meters last_view_row = 0 for (ix, selection, view) in data.scan(items=coding_miRNA_loc, axis=0): # define coordinates of cells passing filter # filter_passed_loc = np.where(view.ca.filter_pass == 1)[0] subview = view.view[:, :] # normalize by total counts per cell and multiply by 10,000 to allocate bits to precision subview_norm_array = subview[:, :] / subview.ca.n_counts * 10_000 # if integer, convert to float to prevent error with filling with nan if np.issubdtype(subview_norm_array.dtype, np.integer): subview_norm_array = subview_norm_array.astype(np.float32) # mask zeroes from distribution tdigest by filling with nan nonzero_data = np.ma.masked_equal(subview_norm_array, 0.0).filled(np.nan) # update tdigests [median_digests[i + last_view_row].update(nonzero_data[i, :]) for i in range(nonzero_data.shape[0])] # update progress meters last_view_row = last_view_row + view.shape[0] median_digest_dict = dict(zip(coding_miRNA_genes, median_digests)) with open(f"{outdir}{file_name}", "wb") as fp: pickle.dump(median_digest_dict, fp) logger.info(f"{outdir}{file_name}") except Exception as e: logger.error(f"{get_filename()} meet trouble {e}") if __name__ == '__main__': files = [os.path.join(home_path, f) for f in os.listdir(home_path) if f.endswith('.loom')] with mp.Pool() as pool: pool.map(nonzero_median_digests, files)