| import polars as pl | |
| concentrations = pl.read_csv("concentrations.csv") | |
| import scanpy as sc | |
| from joblib import Parallel, delayed | |
| import gc | |
| drug_to_concentration = { | |
| row[0]: row[1] | |
| for row in concentrations.iter_rows() | |
| } | |
| def reduce(plate: int): | |
| try: | |
| print(f"Plate: {plate}") | |
| X = sc.read_h5ad(f"../Data/h5ad/h5ad/plate{plate}_filt_Vevo_Tahoe100M_WServicesFrom_ParseGigalab.h5ad", backed="r") | |
| print(f"Loaded: {plate}") | |
| X = X[(X.obs["pass_filter"] == "full") & (X.obs["drugname_drugconc"].astype(str) == X.obs["drug"].map(lambda x: drug_to_concentration[x]).astype(str))] | |
| print(f"Filtered: {plate}") | |
| X.write_h5ad(f"../Data/h5ad/reduced/plate{plate}.h5ad") | |
| print(f"Wrote: {plate}") | |
| cells = X.n_vars | |
| del X | |
| gc.collect() | |
| return cells | |
| except Exception as e: | |
| print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
| print(f"ERROR loading {plate}") | |
| print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
| results = Parallel(n_jobs=4)(delayed(reduce)(i) for i in range(1, 14 + 1)) | |
| print(results) |