| | import random |
| | import warnings |
| | from importlib.util import find_spec |
| | from typing import Callable |
| |
|
| | import numpy as np |
| | import torch |
| | from omegaconf import DictConfig |
| |
|
| | from .logger import RankedLogger |
| | from .rich_utils import enforce_tags, print_config_tree |
| |
|
| | log = RankedLogger(__name__, rank_zero_only=True) |
| |
|
| |
|
| | def extras(cfg: DictConfig) -> None: |
| | """Applies optional utilities before the task is started. |
| | |
| | Utilities: |
| | - Ignoring python warnings |
| | - Setting tags from command line |
| | - Rich config printing |
| | """ |
| |
|
| | |
| | if not cfg.get("extras"): |
| | log.warning("Extras config not found! <cfg.extras=null>") |
| | return |
| |
|
| | |
| | if cfg.extras.get("ignore_warnings"): |
| | log.info("Disabling python warnings! <cfg.extras.ignore_warnings=True>") |
| | warnings.filterwarnings("ignore") |
| |
|
| | |
| | if cfg.extras.get("enforce_tags"): |
| | log.info("Enforcing tags! <cfg.extras.enforce_tags=True>") |
| | enforce_tags(cfg, save_to_file=True) |
| |
|
| | |
| | if cfg.extras.get("print_config"): |
| | log.info("Printing config tree with Rich! <cfg.extras.print_config=True>") |
| | print_config_tree(cfg, resolve=True, save_to_file=True) |
| |
|
| |
|
| | def task_wrapper(task_func: Callable) -> Callable: |
| | """Optional decorator that controls the failure behavior when executing the task function. |
| | |
| | This wrapper can be used to: |
| | - make sure loggers are closed even if the task function raises an exception (prevents multirun failure) |
| | - save the exception to a `.log` file |
| | - mark the run as failed with a dedicated file in the `logs/` folder (so we can find and rerun it later) |
| | - etc. (adjust depending on your needs) |
| | |
| | Example: |
| | ``` |
| | @utils.task_wrapper |
| | def train(cfg: DictConfig) -> Tuple[dict, dict]: |
| | |
| | ... |
| | |
| | return metric_dict, object_dict |
| | ``` |
| | """ |
| |
|
| | def wrap(cfg: DictConfig): |
| | |
| | try: |
| | metric_dict, object_dict = task_func(cfg=cfg) |
| |
|
| | |
| | except Exception as ex: |
| | |
| | log.exception("") |
| |
|
| | |
| | |
| | |
| | |
| | raise ex |
| |
|
| | |
| | finally: |
| | |
| | log.info(f"Output dir: {cfg.paths.run_dir}") |
| |
|
| | |
| | if find_spec("wandb"): |
| | import wandb |
| |
|
| | if wandb.run: |
| | log.info("Closing wandb!") |
| | wandb.finish() |
| |
|
| | return metric_dict, object_dict |
| |
|
| | return wrap |
| |
|
| |
|
| | def get_metric_value(metric_dict: dict, metric_name: str) -> float: |
| | """Safely retrieves value of the metric logged in LightningModule.""" |
| |
|
| | if not metric_name: |
| | log.info("Metric name is None! Skipping metric value retrieval...") |
| | return None |
| |
|
| | if metric_name not in metric_dict: |
| | raise Exception( |
| | f"Metric value not found! <metric_name={metric_name}>\n" |
| | "Make sure metric name logged in LightningModule is correct!\n" |
| | "Make sure `optimized_metric` name in `hparams_search` config is correct!" |
| | ) |
| |
|
| | metric_value = metric_dict[metric_name].item() |
| | log.info(f"Retrieved metric value! <{metric_name}={metric_value}>") |
| |
|
| | return metric_value |
| |
|
| |
|
| | def set_seed(seed: int): |
| | if seed < 0: |
| | seed = -seed |
| | if seed > (1 << 31): |
| | seed = 1 << 31 |
| |
|
| | random.seed(seed) |
| | np.random.seed(seed) |
| | torch.manual_seed(seed) |
| |
|
| | if torch.cuda.is_available(): |
| | torch.cuda.manual_seed(seed) |
| | torch.cuda.manual_seed_all(seed) |
| |
|
| | if torch.backends.cudnn.is_available(): |
| | torch.backends.cudnn.deterministic = True |
| | torch.backends.cudnn.benchmark = False |
| |
|