File size: 4,900 Bytes
5000658 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import array
import struct
import sys
from contextlib import contextmanager
from typing import List, Tuple
from cuda import cudart
from cuda.cudart import cudaError_t
from ._utils import mpi_comm
from .mapping import Mapping
def _raise_if_error(error: cudaError_t):
if error != cudaError_t.cudaSuccess:
raise RuntimeError(error)
@contextmanager
def peer_access(mapping: Mapping):
set_peer_access(mapping, True)
try:
yield
finally:
set_peer_access(mapping, False)
def set_peer_access(mapping: Mapping, enabled: bool = True):
src_node = mapping.local_rank
for rank in mapping.tp_group:
dest_node = mapping.get_local_rank(rank)
if mapping.get_node_rank(
rank) != mapping.node_rank or dest_node == src_node:
continue
error, result = cudart.cudaDeviceCanAccessPeer(src_node, dest_node)
_raise_if_error(error)
if result == 0:
raise RuntimeError(
f"Can't enable access between nodes {src_node} and {dest_node}")
if enabled:
cudart.cudaDeviceEnablePeerAccess(dest_node, 0)
else:
cudart.cudaDeviceDisablePeerAccess(dest_node)
error = cudart.cudaGetLastError()[0]
if error not in [
cudaError_t.cudaSuccess,
cudaError_t.cudaErrorPeerAccessAlreadyEnabled,
cudaError_t.cudaErrorPeerAccessNotEnabled
]:
raise RuntimeError(error)
class IpcMemory():
# WARNING: Must in sync with FLAGS_SIZE in cpp/include/tensorrt_llm/runtime/ipcUtils.h
# (Max all reduce blocks + 1) * sizeof(int)
IPC_BARRIERS_SIZE_PER_GPU = (24 + 1) * 4
def __init__(self, mapping: Mapping, size: int):
self.mapping = mapping
self.open_ipc = mapping.tp_size <= mapping.gpus_per_node
if self.open_ipc:
self.peer_ptrs, self.local_ptr = IpcMemory.open_ipc_memory(
self.mapping, size, True)
else:
self.peer_ptrs = [0] * mapping.tp_size
self.local_ptr = 0
def __del__(self):
if not sys.is_finalizing() and self.open_ipc:
IpcMemory.close_ipc_memory(self.mapping, self.peer_ptrs)
def serialize(self) -> List[int]:
buffer = bytes(0)
for ptr in self.peer_ptrs:
buffer += struct.pack("P", ptr)
return array.array("Q", buffer).tolist()
@staticmethod
def open_ipc_memory(mapping: Mapping,
size: int,
set_to_zero: bool = False) -> Tuple[List[int], int]:
""" Allocates a buffer with the given *size* on each GPU. Then, enables IPC communication between TP groups.
Returns a list of buffer pointers, buffers[i] is a handle to the corresponding buffer residing on GPU #i.
Call close_ipc_handle with the *buffer*.
"""
comm = mpi_comm().Split(mapping.pp_rank, mapping.tp_rank)
error, local_ptr = cudart.cudaMalloc(size)
_raise_if_error(error)
if set_to_zero:
_raise_if_error(cudart.cudaMemset(local_ptr, 0, size)[0])
error, local_handle = cudart.cudaIpcGetMemHandle(local_ptr)
_raise_if_error(error)
handles_reserved = comm.allgather(local_handle.reserved)
handles = []
for reserved in handles_reserved:
handle = cudart.cudaIpcMemHandle_t()
handle.reserved = reserved
handles.append(handle)
peer_ptrs = []
for node, handle in enumerate(handles):
if node == mapping.tp_rank:
peer_ptrs.append(local_ptr)
else:
error, ptr = cudart.cudaIpcOpenMemHandle(
handle, cudart.cudaIpcMemLazyEnablePeerAccess)
_raise_if_error(error)
peer_ptrs.append(ptr)
return peer_ptrs, local_ptr
@staticmethod
def close_ipc_memory(mapping: Mapping, peer_ptrs: List[int]):
for node, ptr in enumerate(peer_ptrs):
if node == mapping.tp_rank:
_raise_if_error(cudart.cudaFree(ptr)[0])
else:
_raise_if_error(cudart.cudaIpcCloseMemHandle(ptr)[0])
|