| import json |
| import logging |
| import os |
| import pickle |
| from pathlib import Path |
| from typing import Any, List |
|
|
| import anndata |
| import dill |
| import matplotlib as mpl |
| import matplotlib.pyplot as plt |
| import numpy as np |
| import pandas as pd |
| import yaml |
| from anndata import AnnData |
| from Bio.SeqIO.FastaIO import SimpleFastaParser |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def create_dirs(paths:List): |
| for path in paths: |
| if not os.path.exists(path): |
| os.mkdir(path) |
|
|
| def save(path: Path, data: object, ignore_ext: bool = False) -> Path: |
| """Saves data to this path. Extension and saving function is determined from the type. |
| If the correct extension was already in the path its also ok. |
| At the moment we handle: |
| - pyplot figures -> .pdf |
| - dictionaries -> .yaml |
| - list -> .yaml |
| - numpy -> .npy |
| - pandas dataframes -> .tsv |
| - anndata -> .h5ad |
| - strings -> .txt |
| - _anything else_ -> .p (pickled with `dill`) |
| Parameters |
| ---------- |
| path : Path |
| The full path to save to |
| data: object |
| Data to save |
| ignore_ext : bool |
| Whether to ignore adding the normal expected extension |
| Returns |
| ------- |
| Path |
| The final path to the file |
| """ |
| if not isinstance(path, Path): |
| path = Path(path) |
|
|
| |
| path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| annotation_path = os.path.dirname(os.path.abspath(__file__)) |
| with open(annotation_path+"/tcga_anndata_groupings.yaml", 'r') as stream: |
| tcga_annotations = yaml.safe_load(stream) |
|
|
| def make_path(p: Path, ext: str) -> Path: |
| """If the path doesn't end with the given extension add the extension to the path. |
| Parameters |
| ---------- |
| p : Path |
| The path |
| ext : str |
| The expected extension |
| Returns |
| ------- |
| Path |
| The fixed path |
| """ |
| if not ignore_ext and not p.name.endswith(ext): |
| return p.parent.joinpath(f"{p.name}{ext}") |
| return p |
|
|
|
|
| |
| if isinstance(data, mpl.figure.Figure): |
| path = make_path(path, ".pdf") |
| data.savefig(path) |
| plt.close(data) |
| |
| elif isinstance(data, dict): |
| path = make_path(path, ".yaml") |
| with open(path, "w") as fp: |
| yaml.dump(data, fp) |
| |
| elif isinstance(data, list): |
| path = make_path(path, ".yaml") |
| with open(path, "w") as fp: |
| yaml.dump(data, fp) |
| |
| elif isinstance(data, np.ndarray): |
| path = make_path(path, ".npy") |
| np.save(path, data) |
| |
| elif isinstance(data, pd.DataFrame): |
| path = make_path(path, ".tsv") |
| data.to_csv(path, sep="\t") |
| |
| elif isinstance(data, anndata.AnnData): |
| path = make_path(path, ".h5ad") |
| for date_col in set(tcga_annotations['anndata']['obs']['datetime_columns']) & set(data.obs.columns): |
| if "datetime" in data.obs[date_col].dtype.name: |
| data.obs[date_col] = data.obs[date_col].dt.strftime("%Y-%m-%d") |
| else: |
| logger.info(f"Column {date_col} in obs should be a date but isnt formatted as one.") |
| data.write(path) |
| |
| elif isinstance(data, str): |
| path = make_path(path, ".txt") |
| with open(path, "w") as fp: |
| fp.write(data) |
| |
| else: |
| path = make_path(path, ".p") |
| dill.dump(data, open(path, "wb")) |
| return path |
|
|
|
|
|
|
| def _resolve_path(path: Path) -> Path: |
| """Given a path, will try to resolve it in multiple ways: |
| |
| 1. Is it a path to a S3 bucket? |
| 2. Is it a global/local file that exists? |
| 3. Is it path that is a prefix to a file that is unique? |
| |
| Parameters |
| ---------- |
| path : Path |
| The path |
| |
| Returns |
| ------- |
| Path |
| The global resolved file. |
| |
| Raises |
| ------ |
| FileNotFoundError |
| If the file doesn't exists or if there are multiple files that match the glob. |
| """ |
| if not path.name.startswith("/"): |
| path = path.expanduser().resolve() |
|
|
| |
| if path.exists(): |
| return path |
|
|
| |
| glob_name = path.name if path.name.endswith("*") else path.name + "*" |
| paths = list(path.parent.glob(glob_name)) |
| if len(paths) == 1: |
| return paths[0] |
|
|
| raise FileNotFoundError( |
| f"Was trying to resolve path\n\t{path}*\nbut was ambigious because there are no or multiple files that fit the glob." |
| ) |
|
|
| def _to_int_string(element: Any) -> str: |
| """Casts a number to a fixed formatted string that's nice categoriazebale. |
| |
| Parameters |
| ---------- |
| element : Any |
| The number, float or int |
| |
| Returns |
| ------- |
| str |
| Either the number formatted as a string or the original input if it |
| didn't work |
| """ |
| try: |
| fl = float(element) |
| return f"{fl:0.0f}" |
| except: |
| return element |
|
|
| def cast_anndata(ad: AnnData) -> None: |
| """Fixes the data-type in the `.obs` and `.var` DataFrame columns of an |
| AnnData object. __Works in-place__. Currently does the following: |
| |
| 1.1. Enforces numerical-categorical `.obs` columns |
| 1.2. Makes all other `.obs` columns categoricals |
| 1.3. Makes date-time `.obs` columns, non-categorical pandas `datetime64` |
| 1.4. Enforces real strinng `.obs` columns, to be strings not categoricals |
| 1.5. Enforces some numerical `.obs` columns |
| |
| Configuration for which column belongs in which group is configured in |
| `/transforna/utils/ngs_annotations.yaml` in this repository. |
| |
| Parameters |
| ---------- |
| ad : AnnData |
| The AnnData object |
| """ |
| |
|
|
| |
| annotation_path = os.path.dirname(os.path.abspath(__file__)) |
| with open(annotation_path+"/tcga_anndata_groupings.yaml", 'r') as stream: |
| tcga_annotations = yaml.safe_load(stream) |
| numerical_categorical_columns: List[str] = set(tcga_annotations['anndata']['obs']['numerical_categorical_columns']) & set( |
| ad.obs.columns |
| ) |
| for column in numerical_categorical_columns: |
| ad.obs[column] = ad.obs[column].apply(_to_int_string).astype("U").astype("category") |
|
|
| |
| ad.strings_to_categoricals() |
|
|
| |
| datetime_columns: List[str] = set(tcga_annotations['anndata']['obs']['datetime_columns']) & set(ad.obs.columns) |
| for column in datetime_columns: |
| try: |
| ad.obs[column] = pd.to_datetime(ad.obs[column]).astype("datetime64[ns]") |
| except ValueError as e: |
| warning( |
| f"""to_datetime error (parsing "unparseable"):\n {e}\nColumn |
| {column} will be set as string not as datetime.""" |
| ) |
| ad.obs[column] = ad.obs[column].astype("string") |
|
|
| |
| |
| |
| string_columns: List[str] = set(tcga_annotations['anndata']['obs']['string_columns']) & set(ad.obs.columns) |
| for column in string_columns: |
| ad.obs[column] = ad.obs[column].astype("string") |
|
|
| |
| |
| numerical_columns: List[str] = set(tcga_annotations['anndata']['obs']['numerical_columns']) & set(ad.obs.columns) |
| for column in numerical_columns: |
| ad.obs[column] = pd.to_numeric(ad.obs[column], errors="coerce") |
|
|
| |
|
|
| |
| |
| boolean_columns: List[str] = set(tcga_annotations['anndata']['var']['boolean_columns']) & set(ad.var.columns) |
| for column in boolean_columns: |
| ad.var[column].fillna(False, inplace=True) |
| ad.var[column] = ad.var[column].astype(bool) |
|
|
|
|
| def load(path: str, ext: str = None, **kwargs): |
| """Loads the given filepath. |
| |
| This will use the extension of the filename to determine what to use for |
| reading (if not overwritten). Most common use-case: |
| |
| At the moment we handle: |
| |
| - pickled objects (.p) |
| - numpy objects (.npy) |
| - dataframes (.csv, .tsv) |
| - json files (.json) |
| - yaml files (.yaml) |
| - anndata files (.h5ad) |
| - excel files (.xlsx) |
| - text (.txt) |
| |
| Parameters |
| ---------- |
| path : str |
| The file-name of the cached file, without extension. (Or path) |
| The file-name can be a glob match e.g. `/data/something/LC__*__21.7.2.*` |
| which matches the everything with anything filling the stars. This only |
| works if there is only one match. So this is shortcut if you do not know |
| the full name but you know there is only one. |
| ext : str, optional |
| The extension to assume, ignoring the actual extension. E.g. loading |
| "tsv" for a "something.csv" file with tab-limits, by default None |
| |
| Returns |
| ------- |
| Whatever is in the saved file. |
| |
| Raises |
| ------ |
| FileNotFoundError |
| If a given path doesn't exist or doesn't give a unqiue file path. |
| NotImplementedError |
| Trying to load a file with an extension we do not have loading code for. |
| """ |
| path = _resolve_path(Path(path)) |
|
|
| |
| if ext is None: |
| ext = path.suffix[1:] |
|
|
| |
| if ext == "p": |
| return pickle.load(open(path, "rb")) |
| |
| elif ext == "npy": |
| return np.load(path) |
| |
| elif ext == "tsv": |
| return pd.read_csv(path, sep="\t", **kwargs) |
| |
| elif ext == "csv": |
| return pd.read_csv(path, **kwargs) |
| |
| elif ext == "json": |
| return json.load(open(path)) |
| |
| elif ext == "yaml": |
| return yaml.load(open(path), Loader=yaml.SafeLoader) |
| |
| elif ext == "h5ad": |
| ad = anndata.read_h5ad(path) |
| cast_anndata(ad) |
| return ad |
| |
| elif ext == "xlsx": |
| return pd.read_excel(path, **kwargs) |
| |
| elif ext == "txt": |
| with open(path, "r") as text_file: |
| return text_file.read() |
| |
| elif ext == "fa": |
| |
| with open(path) as fasta_file: |
| identifiers = [] |
| seqs = [] |
| for title, sequence in SimpleFastaParser(fasta_file): |
| identifiers.append(title.split(None, 1)[0]) |
| seqs.append(sequence) |
| |
| return pd.DataFrame({'Sequences':seqs}) |
| else: |
| raise NotImplementedError |
|
|