|
|
import multiprocessing as mp |
|
|
import os |
|
|
import pickle |
|
|
|
|
|
import crick |
|
|
import loompy as lp |
|
|
import numpy as np |
|
|
from loguru import logger |
|
|
|
|
|
|
|
|
def get_filename() -> str: |
|
|
current_file_name = os.path.basename(__file__) |
|
|
log_name = "{}.log" |
|
|
return log_name.format(current_file_name.split('.')[0]) |
|
|
|
|
|
|
|
|
def set_log() -> None: |
|
|
filename = get_filename() |
|
|
logger.add(f'../log/{filename}') |
|
|
|
|
|
|
|
|
set_log() |
|
|
|
|
|
home_path = '/scDifformer/data/240412_test/looms' |
|
|
outdir = r'/scDifformer/data/240412_test/tdigest/' |
|
|
|
|
|
|
|
|
def nonzero_median_digests(file_path: str) -> None: |
|
|
try: |
|
|
output_file = file_path.replace(".loom", ".gene_median_digest_dict.pickle") |
|
|
file_name = output_file.split(os.sep)[-1] |
|
|
|
|
|
with lp.connect(file_path) as data: |
|
|
|
|
|
coding_miRNA_loc = np.where((data.ra.gene_type == "protein_coding") | (data.ra.gene_type == "miRNA"))[0] |
|
|
coding_miRNA_genes = data.ra["ensembl_id"][coding_miRNA_loc] |
|
|
|
|
|
|
|
|
median_digests = [crick.tdigest.TDigest() for _ in range(len(coding_miRNA_loc))] |
|
|
|
|
|
|
|
|
last_view_row = 0 |
|
|
|
|
|
for (ix, selection, view) in data.scan(items=coding_miRNA_loc, axis=0): |
|
|
|
|
|
|
|
|
subview = view.view[:, :] |
|
|
|
|
|
subview_norm_array = subview[:, :] / subview.ca.n_counts * 10_000 |
|
|
|
|
|
if np.issubdtype(subview_norm_array.dtype, np.integer): |
|
|
subview_norm_array = subview_norm_array.astype(np.float32) |
|
|
|
|
|
nonzero_data = np.ma.masked_equal(subview_norm_array, 0.0).filled(np.nan) |
|
|
|
|
|
[median_digests[i + last_view_row].update(nonzero_data[i, :]) for i in range(nonzero_data.shape[0])] |
|
|
|
|
|
last_view_row = last_view_row + view.shape[0] |
|
|
|
|
|
median_digest_dict = dict(zip(coding_miRNA_genes, median_digests)) |
|
|
with open(f"{outdir}{file_name}", "wb") as fp: |
|
|
pickle.dump(median_digest_dict, fp) |
|
|
logger.info(f"{outdir}{file_name}") |
|
|
except Exception as e: |
|
|
logger.error(f"{get_filename()} meet trouble {e}") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
files = [os.path.join(home_path, f) for f in os.listdir(home_path) if f.endswith('.loom')] |
|
|
with mp.Pool() as pool: |
|
|
pool.map(nonzero_median_digests, files) |
|
|
|