| import gc |
| import os |
| from typing import Optional, TypeVar |
|
|
| import torch |
| import torch.distributed as dist |
|
|
| T = TypeVar("T") |
|
|
|
|
| def seed_all(seed: int): |
| """Seed all rng objects.""" |
| import random |
|
|
| import numpy as np |
|
|
| if seed < 0 or seed > 2**32 - 1: |
| raise ValueError(f"Seed {seed} is invalid. It must be on [0; 2^32 - 1]") |
| random.seed(seed) |
| np.random.seed(seed) |
| torch.manual_seed(seed) |
| |
| |
| torch.cuda.manual_seed_all(seed) |
|
|
|
|
| def is_distributed() -> bool: |
| return dist.is_available() and dist.is_initialized() |
|
|
|
|
| def get_node_rank() -> int: |
| return int(os.environ.get("NODE_RANK") or (get_global_rank() - get_local_rank()) // get_local_world_size()) |
|
|
|
|
| def get_world_size() -> int: |
| if is_distributed(): |
| return dist.get_world_size() |
| else: |
| return 1 |
|
|
|
|
| def get_local_world_size() -> int: |
| return int(os.environ.get("LOCAL_WORLD_SIZE") or 1) |
|
|
|
|
| def get_global_rank() -> int: |
| return int(os.environ.get("RANK") or dist.get_rank()) |
|
|
|
|
| def get_local_rank() -> int: |
| return int(os.environ.get("LOCAL_RANK") or 0) |
|
|
|
|
| def get_fs_local_rank() -> int: |
| """Get the local rank per filesystem, meaning that, regardless of the number of nodes, |
| if all ranks share the same filesystem then `get_fs_local_rank()` will be equivalent to `get_global_rank()`, |
| but if nodes do not share the same filesystem then `get_fs_local_rank()` will be equivalent to `get_local_rank()`. |
| """ |
| return int(os.environ.get("FS_LOCAL_RANK") or get_local_rank()) |
|
|
|
|
| def move_to_device(o: T, device: torch.device) -> T: |
| if isinstance(o, torch.Tensor): |
| return o.to(device) |
| elif isinstance(o, dict): |
| return {k: move_to_device(v, device) for k, v in o.items()} |
| elif isinstance(o, list): |
| return [move_to_device(x, device) for x in o] |
| elif isinstance(o, tuple): |
| return tuple((move_to_device(x, device) for x in o)) |
| else: |
| return o |
|
|
|
|
| def ensure_finite_(x: torch.Tensor, check_neg_inf: bool = True, check_pos_inf: bool = False): |
| """ |
| Modify ``x`` in place to replace ``float("-inf")`` with the minimum value of the dtype when ``check_neg_inf`` |
| is ``True`` and to replace ``float("inf")`` with the maximum value of the dtype when ``check_pos_inf`` is ``True``. |
| """ |
| if check_neg_inf: |
| x.masked_fill_(x == float("-inf"), torch.finfo(x.dtype).min) |
| if check_pos_inf: |
| x.masked_fill_(x == float("inf"), torch.finfo(x.dtype).max) |
|
|
|
|
| def get_default_device() -> torch.device: |
| if torch.cuda.is_available() and torch.cuda.is_initialized(): |
| return torch.device("cuda") |
| else: |
| return torch.device("cpu") |
|
|
|
|
| def barrier() -> None: |
| if is_distributed(): |
| dist.barrier() |
|
|
|
|
| def peak_gpu_memory(reset: bool = False) -> Optional[float]: |
| """ |
| Get the peak GPU memory usage in MB across all ranks. |
| Only rank 0 will get the final result. |
| """ |
| if not torch.cuda.is_available(): |
| return None |
|
|
| device = torch.device("cuda") |
| peak_mb = torch.cuda.max_memory_allocated(device) / 1000000 |
| if is_distributed(): |
| peak_mb_tensor = torch.tensor(peak_mb, device=device) |
| dist.reduce(peak_mb_tensor, 0, dist.ReduceOp.MAX) |
| peak_mb = peak_mb_tensor.item() |
|
|
| if reset: |
| |
| torch.cuda.reset_max_memory_allocated(device) |
|
|
| return peak_mb |
|
|
|
|
| V = TypeVar("V", bool, int, float) |
|
|
|
|
| def synchronize_value(value: V, device: torch.device) -> V: |
| if dist.is_available() and dist.is_initialized(): |
| value_tensor = torch.tensor(value, device=device) |
| dist.broadcast(value_tensor, 0) |
| return value_tensor.item() |
| else: |
| return value |
|
|
|
|
| def synchronize_flag(flag: bool, device: torch.device) -> bool: |
| return synchronize_value(flag, device) |
|
|
|
|
| def gc_cuda(): |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|