| import os | |
| import sys | |
| import wget | |
| import requests | |
| import re | |
| import argparse | |
| from types import GeneratorType, ModuleType | |
| from typing import Union, Tuple | |
| import subprocess | |
| from pathlib import PosixPath, Path | |
| import importlib as im | |
| import json | |
| import pickle | |
| from pydantic import * | |
| from typing import List | |
| import pandas as pd | |
| import numpy as np | |
| from IPython.display import display | |
| import torch | |
| from tqdm import tqdm | |
| from sklearn.metrics import r2_score | |
| from .config import settings, output, data_final, models | |
| def preprocess_genex(genex_data: pd.DataFrame, settings: dict) -> pd.DataFrame: | |
| if settings["data"].get("preprocess", False): | |
| preproc_dict = settings["data"]["preprocess"] | |
| preproc_type = preproc_dict["type"] | |
| if preproc_type == "log": | |
| delta = preproc_dict["delta"] | |
| df_preprocessed = genex_data.applymap(lambda x: np.log(x + delta)) | |
| elif preproc_type == "binary": | |
| thresh = preproc_dict["threshold"] | |
| df_preprocessed = genex_data.applymap(lambda x: float(x > thresh)) | |
| elif preproc_type == "ceiling": | |
| ceiling = preproc_dict["ceiling"] | |
| df_preprocessed = genex_data.applymap(lambda x: min(ceiling, x)) | |
| else: | |
| df_preprocessed = genex_data | |
| return df_preprocessed | |
| else: | |
| return genex_data | |
| def get_args( | |
| data_dir: DirectoryPath = data_final / "transformer" / "seq", | |
| train_data: FilePath = "all_seqs_train.txt", | |
| eval_data: FilePath = None, | |
| test_data: FilePath = "all_seqs_test.txt", | |
| output_dir: DirectoryPath = models / "transformer" / "language-model", | |
| model_name: str = None, | |
| pretrained_model: FilePath = None, | |
| tokenizer_dir: DirectoryPath = None, | |
| log_offset: int = None, | |
| preprocessor: str = None, | |
| filter_empty: bool = False, | |
| hyperparam_search_metrics: List[str] = None, | |
| hyperparam_search_trials: int = None, | |
| transformation: str = None, | |
| output_mode: str = None, | |
| ) -> argparse.Namespace: | |
| """Use Python's ArgumentParser to create a namespace from (optional) user input | |
| Args: | |
| data_dir ([type], optional): Base location of data files. Defaults to data_final/'transformer'/'seq'. | |
| train_data (str, optional): Name of train data file in `data_dir` Defaults to 'all_seqs_train.txt'. | |
| test_data (str, optional): Name of test data file in `data_dir`. Defaults to 'all_seqs_test.txt'. | |
| output_dir ([type], optional): Location to save trained model. Defaults to models/'transformer'/'language-model'. | |
| model_name (Union[str, PosixPath], optional): Name of model | |
| pretrained_mdoel (Union[str, PosixPath], optional): path to config and weights for huggingface pretrained model. | |
| tokenizer_dir (Union[str, PosixPath], optional): path to config files for huggingface pretrained tokenizer. | |
| filter_empty (bool, optional): Whether to filter out empty sequences. | |
| Necessary for kmer-based models; takes additional time. | |
| hyperparam_search_metrics (Union[list, str], optional): metrics for hyperparameter search. | |
| hyperparam_search_trials (int, optional): number of trials to run hyperparameter search. | |
| transformation (str, optional): how to transform data. Defaults to None. | |
| output_mode (str, optional): default output mode for model and data transformation. Defaults to None. | |
| Returns: | |
| argparse.Namespace: parsed arguments | |
| """ | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "-w", | |
| "--warmstart", | |
| action="store_true", | |
| help="Whether to start with a saved checkpoint", | |
| default=False, | |
| ) | |
| parser.add_argument("--num-embeddings", type=int, default=-1) | |
| parser.add_argument( | |
| "--data-dir", | |
| type=str, | |
| default=str(data_dir), | |
| help="Directory containing train/eval data. Defaults to data/final/transformer/seq", | |
| ) | |
| parser.add_argument( | |
| "--train-data", | |
| type=str, | |
| default=train_data, | |
| help="Name of training data file. Will be added to the end of `--data-dir`.", | |
| ) | |
| parser.add_argument( | |
| "--eval-data", | |
| type=str, | |
| default=eval_data, | |
| help="Name of eval data file. Will be added to the end of `--data-dir`.", | |
| ) | |
| parser.add_argument( | |
| "--test-data", | |
| type=str, | |
| default=test_data, | |
| help="Name of test data file. Will be added to the end of `--data-dir`.", | |
| ) | |
| parser.add_argument("--output-dir", type=str, default=str(output_dir)) | |
| parser.add_argument( | |
| "--model-name", | |
| type=str, | |
| help='Name of model. Supported values are "roberta-lm", "roberta-pred", "roberta-pred-mean-pool", "dnabert-lm", "dnabert-pred", "dnabert-pred-mean-pool"', | |
| default=model_name, | |
| ) | |
| parser.add_argument( | |
| "--pretrained-model", | |
| type=str, | |
| help="Directory containing config.json and pytorch_model.bin files for loading pretrained huggingface model", | |
| default=(str(pretrained_model) if pretrained_model else None), | |
| ) | |
| parser.add_argument( | |
| "--tokenizer-dir", | |
| type=str, | |
| help="Directory containing necessary files to instantiate pretrained tokenizer.", | |
| default=str(tokenizer_dir), | |
| ) | |
| parser.add_argument( | |
| "--log-offset", | |
| type=float, | |
| help="Offset to apply to gene expression values before log transform", | |
| default=log_offset, | |
| ) | |
| parser.add_argument( | |
| "--preprocessor", | |
| type=str, | |
| help="Path to pickled preprocessor file", | |
| default=preprocessor, | |
| ) | |
| parser.add_argument( | |
| "--filter-empty", | |
| help="Whether to filter out empty sequences.", | |
| default=filter_empty, | |
| action="store_true", | |
| ) | |
| parser.add_argument( | |
| "--tissue-subset", default=None, help="Subset of tissues to use", nargs="*" | |
| ) | |
| parser.add_argument("--hyperparameter-search", action="store_true", default=False) | |
| parser.add_argument("--ntrials", default=hyperparam_search_trials, type=int) | |
| parser.add_argument("--metrics", default=hyperparam_search_metrics, nargs="*") | |
| parser.add_argument("--direction", type=str, default="minimize") | |
| parser.add_argument( | |
| "--nshards", | |
| type=int, | |
| default=None, | |
| help="Number of shards to divide data into; only the first is kept.", | |
| ) | |
| parser.add_argument( | |
| "--nshards-eval", | |
| type=int, | |
| default=None, | |
| help="Number of shards to divide eval data into.", | |
| ) | |
| parser.add_argument( | |
| "--threshold", | |
| type=float, | |
| default=None, | |
| help="Minimum value for filtering gene expression values.", | |
| ) | |
| parser.add_argument( | |
| "--transformation", | |
| type=str, | |
| default=transformation, | |
| help='How to transform the data. Options are "log", "boxcox"', | |
| ) | |
| parser.add_argument( | |
| "--freeze-base", | |
| action="store_true", | |
| help="Freeze the pretrained base of the model", | |
| ) | |
| parser.add_argument( | |
| "--output-mode", | |
| type=str, | |
| help='Output mode for model: {"regression", "classification"}', | |
| default=output_mode, | |
| ) | |
| parser.add_argument( | |
| "--learning-rate", | |
| type=float, | |
| help="Learning rate for training. Default None", | |
| default=None, | |
| ) | |
| parser.add_argument( | |
| "--num-train-epochs", | |
| type=int, | |
| help="Number of epochs to train for", | |
| default=None, | |
| ) | |
| parser.add_argument( | |
| "--search-metric", | |
| type=str, | |
| help="Metric to optimize in hyperparameter search", | |
| default=None, | |
| ) | |
| parser.add_argument("--batch-norm", action="store_true", default=False) | |
| args, unknown = parser.parse_known_args() | |
| if args.pretrained_model and not args.pretrained_model.startswith("/"): | |
| args.pretrained_model = str(Path.cwd() / args.pretrained_model) | |
| args.data_dir = Path(args.data_dir) | |
| args.output_dir = Path(args.output_dir) | |
| args.train_data = _get_fpath_if_not_none(args.data_dir, args.train_data) | |
| args.eval_data = _get_fpath_if_not_none(args.data_dir, args.eval_data) | |
| args.test_data = _get_fpath_if_not_none(args.data_dir, args.test_data) | |
| args.preprocessor = Path(args.preprocessor) if args.preprocessor else None | |
| if args.tissue_subset is not None: | |
| if isinstance(args.tissue_subset, (int, str)): | |
| args.tissue_subset = [args.tissue_subset] | |
| args.tissue_subset = [ | |
| int(t) if t.isnumeric() else t for t in args.tissue_subset | |
| ] | |
| return args | |
| def get_model_settings( | |
| settings: dict, args: dict = None, model_name: str = None | |
| ) -> dict: | |
| """Get the appropriate model settings from the dictionary `settings`.""" | |
| if model_name is None: | |
| model_name = args.model_name | |
| base_model_name = model_name.split("-")[0] + "-base" | |
| base_model_settings = settings["models"].get(base_model_name, {}) | |
| model_settings = settings["models"].get(model_name, {}) | |
| data_settings = settings["data"] | |
| settings = dict(**base_model_settings, **model_settings, **data_settings) | |
| if args is not None: | |
| if args.output_mode: | |
| settings["output_mode"] = args.output_mode | |
| if args.tissue_subset is not None: | |
| settings["num_labels"] = len(args.tissue_subset) | |
| if args.batch_norm: | |
| settings["batch_norm"] = args.batch_norm | |
| return settings | |
| def _get_fpath_if_not_none( | |
| dirpath: PosixPath, fpath: PosixPath | |
| ) -> Union[None, PosixPath]: | |
| if fpath: | |
| return dirpath / fpath | |
| return None | |
| def load_pickle(path: PosixPath) -> object: | |
| with path.open("rb") as f: | |
| obj = pickle.load(f) | |
| return obj | |