Spaces:
Sleeping
Sleeping
| # Copied from https://github.com/NVIDIA/DeepLearningExamples/blob/master/PyTorch/LanguageModeling/Transformer-XL/pytorch/utils/distributed.py | |
| # Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import os | |
| from contextlib import contextmanager | |
| import torch | |
| def init_distributed(cuda): | |
| """ | |
| Initializes distributed backend. | |
| :param cuda: (bool) if True initializes nccl backend, if False initializes | |
| gloo backend | |
| """ | |
| world_size = int(os.environ.get('WORLD_SIZE', 1)) | |
| distributed = (world_size > 1) | |
| if distributed: | |
| backend = 'nccl' if cuda else 'gloo' | |
| torch.distributed.init_process_group(backend=backend, | |
| init_method='env://') | |
| assert torch.distributed.is_initialized() | |
| return distributed | |
| def barrier(): | |
| """ | |
| Call torch.distributed.barrier() if distritubed is in use | |
| """ | |
| if torch.distributed.is_available() and torch.distributed.is_initialized(): | |
| torch.distributed.barrier() | |
| def get_rank(): | |
| """ | |
| Gets distributed rank or returns zero if distributed is not initialized. | |
| """ | |
| if torch.distributed.is_available() and torch.distributed.is_initialized(): | |
| rank = torch.distributed.get_rank() | |
| else: | |
| rank = 0 | |
| return rank | |
| def get_world_size(): | |
| """ | |
| Gets total number of distributed workers or returns one if distributed is | |
| not initialized. | |
| """ | |
| if torch.distributed.is_available() and torch.distributed.is_initialized(): | |
| world_size = torch.distributed.get_world_size() | |
| else: | |
| world_size = 1 | |
| return world_size | |
| def all_reduce_item(value, op='sum'): | |
| """ | |
| All-reduces single scalar value if distributed is in use | |
| """ | |
| if torch.distributed.is_available() and torch.distributed.is_initialized(): | |
| if op == 'sum' or op == 'mean': | |
| dop = torch.distributed.ReduceOp.SUM | |
| elif op == 'min': | |
| dop = torch.distributed.ReduceOp.MIN | |
| elif op == 'max': | |
| dop = torch.distributed.ReduceOp.MAX | |
| elif op == 'product': | |
| dop = torch.distributed.ReduceOp.PRODUCT | |
| else: | |
| raise RuntimeError('Unsupported reduce op') | |
| backend = torch.distributed.get_backend() | |
| if backend == torch.distributed.Backend.NCCL: | |
| device = torch.device('cuda') | |
| elif backend == torch.distributed.Backend.GLOO: | |
| device = torch.device('cpu') | |
| else: | |
| raise RuntimeError('Unsupported distributed backend') | |
| tensor = torch.tensor(value, device=device) | |
| torch.distributed.all_reduce(tensor, dop) | |
| if op == 'mean': | |
| tensor /= get_world_size() | |
| ret = tensor.item() | |
| else: | |
| ret = value | |
| return ret | |
| def sync_workers(): | |
| """ | |
| Yields distributed rank and synchronizes all workers on exit. | |
| """ | |
| rank = get_rank() | |
| yield rank | |
| barrier() | |