Spaces:
Build error
Build error
| import math | |
| import os | |
| import random | |
| import numpy as np | |
| import torch | |
| import torch.distributed as dist | |
| from torch.utils.data import DistributedSampler as _DistributedSampler | |
| def setup_seed(seed, cuda_deterministic=True): | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| np.random.seed(seed) | |
| random.seed(seed) | |
| os.environ["PYTHONHASHSEED"] = str(seed) | |
| if cuda_deterministic: # slower, more reproducible | |
| torch.backends.cudnn.deterministic = True | |
| torch.backends.cudnn.benchmark = False | |
| else: # faster, less reproducible | |
| torch.backends.cudnn.deterministic = False | |
| torch.backends.cudnn.benchmark = True | |
| def worker_init_fn(worker_id, num_workers, rank, seed): | |
| # The seed of each worker equals to | |
| # num_worker * rank + worker_id + user_seed | |
| worker_seed = num_workers * rank + worker_id + seed | |
| np.random.seed(worker_seed) | |
| random.seed(worker_seed) | |
| torch.manual_seed(worker_seed) | |
| def get_dist_info(): | |
| if dist.is_available() and dist.is_initialized(): | |
| rank = dist.get_rank() | |
| world_size = dist.get_world_size() | |
| else: | |
| rank = 0 | |
| world_size = 1 | |
| return rank, world_size | |
| def sync_random_seed(seed=None, device="cuda"): | |
| """Make sure different ranks share the same seed. | |
| All workers must call this function, otherwise it will deadlock. | |
| This method is generally used in `DistributedSampler`, | |
| because the seed should be identical across all processes | |
| in the distributed group. | |
| In distributed sampling, different ranks should sample non-overlapped | |
| data in the dataset. Therefore, this function is used to make sure that | |
| each rank shuffles the data indices in the same order based | |
| on the same seed. Then different ranks could use different indices | |
| to select non-overlapped data from the same data list. | |
| Args: | |
| seed (int, Optional): The seed. Default to None. | |
| device (str): The device where the seed will be put on. | |
| Default to 'cuda'. | |
| Returns: | |
| int: Seed to be used. | |
| """ | |
| if seed is None: | |
| seed = np.random.randint(2**31) | |
| assert isinstance(seed, int) | |
| rank, world_size = get_dist_info() | |
| if world_size == 1: | |
| return seed | |
| if rank == 0: | |
| random_num = torch.tensor(seed, dtype=torch.int32, device=device) | |
| else: | |
| random_num = torch.tensor(0, dtype=torch.int32, device=device) | |
| dist.broadcast(random_num, src=0) | |
| return random_num.item() | |
| class DistributedSampler(_DistributedSampler): | |
| def __init__( | |
| self, | |
| dataset, | |
| num_replicas=None, # world_size | |
| rank=None, # local_rank | |
| shuffle=True, | |
| seed=0, | |
| ): | |
| super().__init__(dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle) | |
| # In distributed sampling, different ranks should sample | |
| # non-overlapped data in the dataset. Therefore, this function | |
| # is used to make sure that each rank shuffles the data indices | |
| # in the same order based on the same seed. Then different ranks | |
| # could use different indices to select non-overlapped data from the | |
| # same data list. | |
| self.seed = sync_random_seed(seed) | |
| def __iter__(self): | |
| # deterministically shuffle based on epoch | |
| if self.shuffle: | |
| g = torch.Generator() | |
| # When :attr:`shuffle=True`, this ensures all replicas | |
| # use a different random ordering for each epoch. | |
| # Otherwise, the next iteration of this sampler will | |
| # yield the same ordering. | |
| g.manual_seed(self.epoch + self.seed) | |
| indices = torch.randperm(len(self.dataset), generator=g).tolist() | |
| else: | |
| indices = torch.arange(len(self.dataset)).tolist() | |
| # add extra samples to make it evenly divisible | |
| # in case that indices is shorter than half of total_size | |
| indices = (indices * math.ceil(self.total_size / len(indices)))[: self.total_size] | |
| assert len(indices) == self.total_size | |
| # subsample | |
| indices = indices[self.rank : self.total_size : self.num_replicas] | |
| assert len(indices) == self.num_samples | |
| return iter(indices) | |