File size: 4,900 Bytes
5000658
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import array
import struct
import sys
from contextlib import contextmanager
from typing import List, Tuple

from cuda import cudart
from cuda.cudart import cudaError_t

from ._utils import mpi_comm
from .mapping import Mapping


def _raise_if_error(error: cudaError_t):
    if error != cudaError_t.cudaSuccess:
        raise RuntimeError(error)


@contextmanager
def peer_access(mapping: Mapping):
    set_peer_access(mapping, True)
    try:
        yield
    finally:
        set_peer_access(mapping, False)


def set_peer_access(mapping: Mapping, enabled: bool = True):
    src_node = mapping.local_rank
    for rank in mapping.tp_group:
        dest_node = mapping.get_local_rank(rank)
        if mapping.get_node_rank(
                rank) != mapping.node_rank or dest_node == src_node:
            continue

        error, result = cudart.cudaDeviceCanAccessPeer(src_node, dest_node)
        _raise_if_error(error)

        if result == 0:
            raise RuntimeError(
                f"Can't enable access between nodes {src_node} and {dest_node}")

        if enabled:
            cudart.cudaDeviceEnablePeerAccess(dest_node, 0)
        else:
            cudart.cudaDeviceDisablePeerAccess(dest_node)
        error = cudart.cudaGetLastError()[0]
        if error not in [
                cudaError_t.cudaSuccess,
                cudaError_t.cudaErrorPeerAccessAlreadyEnabled,
                cudaError_t.cudaErrorPeerAccessNotEnabled
        ]:
            raise RuntimeError(error)


class IpcMemory():

    # WARNING: Must in sync with FLAGS_SIZE in cpp/include/tensorrt_llm/runtime/ipcUtils.h
    # (Max all reduce blocks + 1) * sizeof(int)
    IPC_BARRIERS_SIZE_PER_GPU = (24 + 1) * 4

    def __init__(self, mapping: Mapping, size: int):
        self.mapping = mapping
        self.open_ipc = mapping.tp_size <= mapping.gpus_per_node
        if self.open_ipc:
            self.peer_ptrs, self.local_ptr = IpcMemory.open_ipc_memory(
                self.mapping, size, True)
        else:
            self.peer_ptrs = [0] * mapping.tp_size
            self.local_ptr = 0

    def __del__(self):
        if not sys.is_finalizing() and self.open_ipc:
            IpcMemory.close_ipc_memory(self.mapping, self.peer_ptrs)

    def serialize(self) -> List[int]:
        buffer = bytes(0)
        for ptr in self.peer_ptrs:
            buffer += struct.pack("P", ptr)

        return array.array("Q", buffer).tolist()

    @staticmethod
    def open_ipc_memory(mapping: Mapping,
                        size: int,
                        set_to_zero: bool = False) -> Tuple[List[int], int]:
        """ Allocates a buffer with the given *size* on each GPU. Then, enables IPC communication between TP groups.
        Returns a list of buffer pointers, buffers[i] is a handle to the corresponding buffer residing on GPU #i.
        Call close_ipc_handle with the *buffer*.
        """
        comm = mpi_comm().Split(mapping.pp_rank, mapping.tp_rank)

        error, local_ptr = cudart.cudaMalloc(size)
        _raise_if_error(error)
        if set_to_zero:
            _raise_if_error(cudart.cudaMemset(local_ptr, 0, size)[0])
        error, local_handle = cudart.cudaIpcGetMemHandle(local_ptr)
        _raise_if_error(error)

        handles_reserved = comm.allgather(local_handle.reserved)
        handles = []
        for reserved in handles_reserved:
            handle = cudart.cudaIpcMemHandle_t()
            handle.reserved = reserved
            handles.append(handle)

        peer_ptrs = []
        for node, handle in enumerate(handles):
            if node == mapping.tp_rank:
                peer_ptrs.append(local_ptr)
            else:
                error, ptr = cudart.cudaIpcOpenMemHandle(
                    handle, cudart.cudaIpcMemLazyEnablePeerAccess)
                _raise_if_error(error)
                peer_ptrs.append(ptr)

        return peer_ptrs, local_ptr

    @staticmethod
    def close_ipc_memory(mapping: Mapping, peer_ptrs: List[int]):
        for node, ptr in enumerate(peer_ptrs):
            if node == mapping.tp_rank:
                _raise_if_error(cudart.cudaFree(ptr)[0])
            else:
                _raise_if_error(cudart.cudaIpcCloseMemHandle(ptr)[0])