Spaces:
Sleeping
Sleeping
| # SPDX-FileCopyrightText: Copyright (c) 2023 - 2025 NVIDIA CORPORATION & AFFILIATES. | |
| # SPDX-FileCopyrightText: All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from dataclasses import dataclass | |
| from typing import Literal, Union | |
| import torch | |
| import torch.distributed as dist | |
| from torch.distributed.device_mesh import DeviceMesh | |
| class RingPassingConfig: | |
| """Configuration for ring-based communication operations. | |
| This class encapsulates all parameters needed for ring communication patterns, | |
| making it easier to pass consistent configurations between functions. | |
| Attributes: | |
| mesh_dim (int): Mesh dimension for the ring communication | |
| tensor_dim (int): Tensor dimension involved in the communication | |
| mesh_size (int): Size of the mesh for this communication | |
| communication_method (str): Method for exchanging data ("p2p" or "a2a") | |
| """ | |
| VALID_COMM_METHODS = ["p2p", "a2a"] | |
| VALID_RING_DIRECTIONS = ["forward", "backward"] | |
| mesh_dim: int | |
| mesh_size: int | |
| ring_direction: Literal["forward", "backward"] = "forward" | |
| communication_method: Literal["p2p", "a2a"] = "a2a" | |
| def __post_init__(self) -> None: | |
| """Validate configuration parameters after initialization. | |
| Raises: | |
| ValueError: If invalid communication method is specified | |
| """ | |
| if self.communication_method not in self.VALID_COMM_METHODS: | |
| raise ValueError( | |
| f"Invalid communication method: {self.communication_method}. " | |
| f"Must be one of {self.VALID_COMM_METHODS}" | |
| ) | |
| if self.ring_direction not in self.VALID_RING_DIRECTIONS: | |
| raise ValueError( | |
| f"Invalid ring direction: {self.ring_direction}. " | |
| f"Must be one of {self.VALID_RING_DIRECTIONS}" | |
| ) | |
| def perform_ring_iteration( | |
| tensor: torch.Tensor, | |
| mesh: DeviceMesh, | |
| ring_config: RingPassingConfig, | |
| recv_shape: Union[torch.Size, None] = None, | |
| ) -> torch.Tensor: | |
| """ | |
| Performs a ring collective communication where all tensors are the same size. | |
| Tensors are sent to the next rank in the ring, and wrap around from rank N-1 to rank 0. | |
| This implements a single step of ring communication where each process sends data to | |
| its neighbor and receives data from its other neighbor. | |
| Args: | |
| tensor (torch.Tensor): The tensor to be sent in this ring communication step | |
| mesh (DeviceMesh): Device mesh that defines the distributed process group | |
| ring_config (RingPassingConfig): Configuration for the ring communication pattern | |
| Returns: | |
| torch.Tensor: The tensor received from the previous rank in the ring | |
| """ | |
| dtype = tensor.dtype | |
| device = tensor.device | |
| # Get process group info | |
| local_group = mesh.get_group(ring_config.mesh_dim) | |
| local_rank = mesh.get_local_rank(ring_config.mesh_dim) | |
| local_size = dist.get_world_size(group=local_group) | |
| # Point-to-point communication | |
| local_id_for_send = local_rank + 1 if local_rank < local_size - 1 else 0 | |
| local_id_for_recv = local_rank - 1 if local_rank > 0 else local_size - 1 | |
| id_for_send = dist.get_global_rank(group=local_group, group_rank=local_id_for_send) | |
| id_for_recv = dist.get_global_rank(group=local_group, group_rank=local_id_for_recv) | |
| if ring_config.ring_direction == "reverse": | |
| # Swap | |
| id_for_send, id_for_recv = id_for_recv, id_for_send | |
| if not tensor.is_contiguous(): | |
| tensor = tensor.contiguous() | |
| if recv_shape is None: | |
| tensor_recv = torch.empty_like(tensor) | |
| else: | |
| tensor_recv = torch.empty(recv_shape, dtype=dtype, device=device) | |
| if ring_config.communication_method == "p2p": | |
| p2p_op_list = [] | |
| torch.cuda.set_device(tensor.device) | |
| # Post receive | |
| p2p_op_list.append( | |
| dist.P2POp( | |
| op=dist.irecv, | |
| tensor=tensor_recv, | |
| peer=id_for_recv, | |
| group=local_group, | |
| ) | |
| ) | |
| # Post sends | |
| p2p_op_list.append( | |
| dist.P2POp( | |
| op=dist.isend, | |
| tensor=tensor, | |
| peer=id_for_send, | |
| group=local_group, | |
| ) | |
| ) | |
| # Ensure all communication completes | |
| reqs = dist.batch_isend_irecv(p2p_op_list) | |
| for req in reqs: | |
| req.wait() | |
| elif ring_config.communication_method == "a2a": | |
| # All-to-all communication | |
| all_to_all_send = [ | |
| torch.empty(0, dtype=dtype, device=device) for _ in range(local_size) | |
| ] | |
| all_to_all_recv = [ | |
| torch.empty(0, dtype=dtype, device=device) for _ in range(local_size) | |
| ] | |
| all_to_all_recv[id_for_recv] = tensor_recv | |
| all_to_all_send[id_for_send] = tensor | |
| # Perform exchange | |
| dist.all_to_all(all_to_all_recv, all_to_all_send, group=local_group) | |
| return tensor_recv | |