| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import collections |
| | import itertools |
| | import math |
| | import os |
| | import pathlib |
| | import re |
| |
|
| | import pynvml |
| |
|
| |
|
| | class Device: |
| | |
| | _nvml_affinity_elements = math.ceil(os.cpu_count() / 64) |
| |
|
| | def __init__(self, device_idx): |
| | super().__init__() |
| | self.handle = pynvml.nvmlDeviceGetHandleByIndex(device_idx) |
| |
|
| | def get_name(self): |
| | return pynvml.nvmlDeviceGetName(self.handle) |
| |
|
| | def get_uuid(self): |
| | return pynvml.nvmlDeviceGetUUID(self.handle) |
| |
|
| | def get_cpu_affinity(self): |
| | affinity_string = "" |
| | for j in pynvml.nvmlDeviceGetCpuAffinity(self.handle, Device._nvml_affinity_elements): |
| | |
| | affinity_string = "{:064b}".format(j) + affinity_string |
| |
|
| | affinity_list = [int(x) for x in affinity_string] |
| | affinity_list.reverse() |
| |
|
| | ret = [i for i, e in enumerate(affinity_list) if e != 0] |
| | return ret |
| |
|
| |
|
| | def get_thread_siblings_list(): |
| | """ |
| | Returns a list of 2-element integer tuples representing pairs of |
| | hyperthreading cores. |
| | """ |
| | path = "/sys/devices/system/cpu/cpu*/topology/thread_siblings_list" |
| | thread_siblings_list = [] |
| | pattern = re.compile(r"(\d+)\D(\d+)") |
| | for fname in pathlib.Path(path[0]).glob(path[1:]): |
| | with open(fname) as f: |
| | content = f.read().strip() |
| | res = pattern.findall(content) |
| | if res: |
| | pair = tuple(map(int, res[0])) |
| | thread_siblings_list.append(pair) |
| | return thread_siblings_list |
| |
|
| |
|
| | def check_socket_affinities(socket_affinities): |
| | |
| | for i, j in itertools.product(socket_affinities, socket_affinities): |
| | if not set(i) == set(j) and not set(i).isdisjoint(set(j)): |
| | raise RuntimeError(f"Sets of cores should be either identical or disjoint, " f"but got {i} and {j}.") |
| |
|
| |
|
| | def get_socket_affinities(nproc_per_node, exclude_unavailable_cores=True): |
| | devices = [Device(i) for i in range(nproc_per_node)] |
| | socket_affinities = [dev.get_cpu_affinity() for dev in devices] |
| |
|
| | if exclude_unavailable_cores: |
| | available_cores = os.sched_getaffinity(0) |
| | socket_affinities = [list(set(affinity) & available_cores) for affinity in socket_affinities] |
| |
|
| | check_socket_affinities(socket_affinities) |
| |
|
| | return socket_affinities |
| |
|
| |
|
| | def set_socket_affinity(gpu_id): |
| | """ |
| | The process is assigned with all available logical CPU cores from the CPU |
| | socket connected to the GPU with a given id. |
| | |
| | Args: |
| | gpu_id: index of a GPU |
| | """ |
| | dev = Device(gpu_id) |
| | affinity = dev.get_cpu_affinity() |
| | os.sched_setaffinity(0, affinity) |
| |
|
| |
|
| | def set_single_affinity(gpu_id): |
| | """ |
| | The process is assigned with the first available logical CPU core from the |
| | list of all CPU cores from the CPU socket connected to the GPU with a given |
| | id. |
| | |
| | Args: |
| | gpu_id: index of a GPU |
| | """ |
| | dev = Device(gpu_id) |
| | affinity = dev.get_cpu_affinity() |
| |
|
| | |
| | available_cores = os.sched_getaffinity(0) |
| | affinity = list(set(affinity) & available_cores) |
| | os.sched_setaffinity(0, affinity[:1]) |
| |
|
| |
|
| | def set_single_unique_affinity(gpu_id, nproc_per_node): |
| | """ |
| | The process is assigned with a single unique available physical CPU core |
| | from the list of all CPU cores from the CPU socket connected to the GPU with |
| | a given id. |
| | |
| | Args: |
| | gpu_id: index of a GPU |
| | """ |
| | socket_affinities = get_socket_affinities(nproc_per_node) |
| |
|
| | siblings_list = get_thread_siblings_list() |
| | siblings_dict = dict(siblings_list) |
| |
|
| | |
| | for idx, socket_affinity in enumerate(socket_affinities): |
| | socket_affinities[idx] = list(set(socket_affinity) - set(siblings_dict.values())) |
| |
|
| | affinities = [] |
| | assigned = [] |
| |
|
| | for socket_affinity in socket_affinities: |
| | for core in socket_affinity: |
| | if core not in assigned: |
| | affinities.append([core]) |
| | assigned.append(core) |
| | break |
| | os.sched_setaffinity(0, affinities[gpu_id]) |
| |
|
| |
|
| | def set_socket_unique_affinity(gpu_id, nproc_per_node, mode, balanced=True): |
| | """ |
| | The process is assigned with an unique subset of available physical CPU |
| | cores from the CPU socket connected to a GPU with a given id. |
| | Assignment automatically includes hyperthreading siblings (if siblings are |
| | available). |
| | |
| | Args: |
| | gpu_id: index of a GPU |
| | nproc_per_node: total number of processes per node |
| | mode: mode |
| | balanced: assign an equal number of physical cores to each process |
| | """ |
| | socket_affinities = get_socket_affinities(nproc_per_node) |
| |
|
| | siblings_list = get_thread_siblings_list() |
| | siblings_dict = dict(siblings_list) |
| |
|
| | |
| | for idx, socket_affinity in enumerate(socket_affinities): |
| | socket_affinities[idx] = list(set(socket_affinity) - set(siblings_dict.values())) |
| |
|
| | socket_affinities_to_device_ids = collections.defaultdict(list) |
| |
|
| | for idx, socket_affinity in enumerate(socket_affinities): |
| | socket_affinities_to_device_ids[tuple(socket_affinity)].append(idx) |
| |
|
| | |
| | |
| | min_physical_cores_per_gpu = min( |
| | [len(cores) // len(gpus) for cores, gpus in socket_affinities_to_device_ids.items()] |
| | ) |
| |
|
| | for socket_affinity, device_ids in socket_affinities_to_device_ids.items(): |
| | devices_per_group = len(device_ids) |
| | if balanced: |
| | cores_per_device = min_physical_cores_per_gpu |
| | socket_affinity = socket_affinity[: devices_per_group * min_physical_cores_per_gpu] |
| | else: |
| | cores_per_device = len(socket_affinity) // devices_per_group |
| |
|
| | for group_id, device_id in enumerate(device_ids): |
| | if device_id == gpu_id: |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | if mode == "interleaved": |
| | affinity = list(socket_affinity[group_id::devices_per_group]) |
| | elif mode == "continuous": |
| | affinity = list(socket_affinity[group_id * cores_per_device: (group_id + 1) * cores_per_device]) |
| | else: |
| | raise RuntimeError("Unknown set_socket_unique_affinity mode") |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | affinity += [siblings_dict[aff] for aff in affinity if aff in siblings_dict] |
| | os.sched_setaffinity(0, affinity) |
| |
|
| |
|
| | def set_affinity(gpu_id, nproc_per_node, mode="socket_unique_continuous", balanced=True): |
| | """ |
| | The process is assigned with a proper CPU affinity which matches hardware |
| | architecture on a given platform. Usually it improves and stabilizes |
| | performance of deep learning training workloads. |
| | |
| | This function assumes that the workload is running in multi-process |
| | single-device mode (there are multiple training processes and each process |
| | is running on a single GPU), which is typical for multi-GPU training |
| | workloads using `torch.nn.parallel.DistributedDataParallel`. |
| | |
| | Available affinity modes: |
| | * 'socket' - the process is assigned with all available logical CPU cores |
| | from the CPU socket connected to the GPU with a given id. |
| | * 'single' - the process is assigned with the first available logical CPU |
| | core from the list of all CPU cores from the CPU socket connected to the GPU |
| | with a given id (multiple GPUs could be assigned with the same CPU core). |
| | * 'single_unique' - the process is assigned with a single unique available |
| | physical CPU core from the list of all CPU cores from the CPU socket |
| | connected to the GPU with a given id. |
| | * 'socket_unique_interleaved' - the process is assigned with an unique |
| | subset of available physical CPU cores from the CPU socket connected to a |
| | GPU with a given id, hyperthreading siblings are included automatically, |
| | cores are assigned with interleaved indexing pattern |
| | * 'socket_unique_continuous' - (the default) the process is assigned with an |
| | unique subset of available physical CPU cores from the CPU socket connected |
| | to a GPU with a given id, hyperthreading siblings are included |
| | automatically, cores are assigned with continuous indexing pattern |
| | |
| | 'socket_unique_continuous' is the recommended mode for deep learning |
| | training workloads on NVIDIA DGX machines. |
| | |
| | Args: |
| | gpu_id: integer index of a GPU |
| | nproc_per_node: number of processes per node |
| | mode: affinity mode |
| | balanced: assign an equal number of physical cores to each process, |
| | affects only 'socket_unique_interleaved' and |
| | 'socket_unique_continuous' affinity modes |
| | |
| | Returns a set of logical CPU cores on which the process is eligible to run. |
| | |
| | Example: |
| | |
| | import argparse |
| | import os |
| | |
| | import gpu_affinity |
| | import torch |
| | |
| | |
| | def main(): |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument( |
| | '--local_rank', |
| | type=int, |
| | default=os.getenv('LOCAL_RANK', 0), |
| | ) |
| | args = parser.parse_args() |
| | |
| | nproc_per_node = torch.cuda.device_count() |
| | |
| | affinity = gpu_affinity.set_affinity(args.local_rank, nproc_per_node) |
| | print(f'{args.local_rank}: core affinity: {affinity}') |
| | |
| | |
| | if __name__ == "__main__": |
| | main() |
| | |
| | Launch the example with: |
| | python -m torch.distributed.launch --nproc_per_node <#GPUs> example.py |
| | |
| | |
| | WARNING: On DGX A100 only a half of CPU cores have direct access to GPUs. |
| | This function restricts execution only to the CPU cores directly connected |
| | to GPUs, so on DGX A100 it will limit the code to half of CPU cores and half |
| | of CPU memory bandwidth (which may be fine for many DL models). |
| | """ |
| | pynvml.nvmlInit() |
| |
|
| | if mode == "socket": |
| | set_socket_affinity(gpu_id) |
| | elif mode == "single": |
| | set_single_affinity(gpu_id) |
| | elif mode == "single_unique": |
| | set_single_unique_affinity(gpu_id, nproc_per_node) |
| | elif mode == "socket_unique_interleaved": |
| | set_socket_unique_affinity(gpu_id, nproc_per_node, "interleaved", balanced) |
| | elif mode == "socket_unique_continuous": |
| | set_socket_unique_affinity(gpu_id, nproc_per_node, "continuous", balanced) |
| | else: |
| | raise RuntimeError("Unknown affinity mode") |
| |
|
| | affinity = os.sched_getaffinity(0) |
| | return affinity |
| |
|