| | from collections import defaultdict |
| | import os, numpy as np |
| | import platform |
| | import shutil |
| | import subprocess |
| | import warnings |
| | import sys |
| |
|
| | try: |
| | from mpi4py import MPI |
| | except ImportError: |
| | MPI = None |
| |
|
| |
|
| | def sync_from_root(sess, variables, comm=None): |
| | """ |
| | Send the root node's parameters to every worker. |
| | Arguments: |
| | sess: the TensorFlow session. |
| | variables: all parameter variables including optimizer's |
| | """ |
| | if comm is None: comm = MPI.COMM_WORLD |
| | import tensorflow as tf |
| | values = comm.bcast(sess.run(variables)) |
| | sess.run([tf.compat.v1.assign(var, val) |
| | for (var, val) in zip(variables, values)]) |
| |
|
| | def gpu_count(): |
| | """ |
| | Count the GPUs on this machine. |
| | """ |
| | if shutil.which('nvidia-smi') is None: |
| | return 0 |
| | output = subprocess.check_output(['nvidia-smi', '--query-gpu=gpu_name', '--format=csv']) |
| | return max(0, len(output.split(b'\n')) - 2) |
| |
|
| | def setup_mpi_gpus(): |
| | """ |
| | Set CUDA_VISIBLE_DEVICES to MPI rank if not already set |
| | """ |
| | if 'CUDA_VISIBLE_DEVICES' not in os.environ: |
| | if sys.platform == 'darwin': |
| | ids = [] |
| | else: |
| | lrank, _lsize = get_local_rank_size(MPI.COMM_WORLD) |
| | ids = [lrank] |
| | os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, ids)) |
| |
|
| | def get_local_rank_size(comm): |
| | """ |
| | Returns the rank of each process on its machine |
| | The processes on a given machine will be assigned ranks |
| | 0, 1, 2, ..., N-1, |
| | where N is the number of processes on this machine. |
| | |
| | Useful if you want to assign one gpu per machine |
| | """ |
| | this_node = platform.node() |
| | ranks_nodes = comm.allgather((comm.Get_rank(), this_node)) |
| | node2rankssofar = defaultdict(int) |
| | local_rank = None |
| | for (rank, node) in ranks_nodes: |
| | if rank == comm.Get_rank(): |
| | local_rank = node2rankssofar[node] |
| | node2rankssofar[node] += 1 |
| | assert local_rank is not None |
| | return local_rank, node2rankssofar[this_node] |
| |
|
| | def share_file(comm, path): |
| | """ |
| | Copies the file from rank 0 to all other ranks |
| | Puts it in the same place on all machines |
| | """ |
| | localrank, _ = get_local_rank_size(comm) |
| | if comm.Get_rank() == 0: |
| | with open(path, 'rb') as fh: |
| | data = fh.read() |
| | comm.bcast(data) |
| | else: |
| | data = comm.bcast(None) |
| | if localrank == 0: |
| | os.makedirs(os.path.dirname(path), exist_ok=True) |
| | with open(path, 'wb') as fh: |
| | fh.write(data) |
| | comm.Barrier() |
| |
|
| | def dict_gather(comm, d, op='mean', assert_all_have_data=True): |
| | """ |
| | Perform a reduction operation over dicts |
| | """ |
| | if comm is None: return d |
| | alldicts = comm.allgather(d) |
| | size = comm.size |
| | k2li = defaultdict(list) |
| | for d in alldicts: |
| | for (k,v) in d.items(): |
| | k2li[k].append(v) |
| | result = {} |
| | for (k,li) in k2li.items(): |
| | if assert_all_have_data: |
| | assert len(li)==size, "only %i out of %i MPI workers have sent '%s'" % (len(li), size, k) |
| | if op=='mean': |
| | result[k] = np.mean(li, axis=0) |
| | elif op=='sum': |
| | result[k] = np.sum(li, axis=0) |
| | else: |
| | assert 0, op |
| | return result |
| |
|
| | def mpi_weighted_mean(comm, local_name2valcount): |
| | """ |
| | Perform a weighted average over dicts that are each on a different node |
| | Input: local_name2valcount: dict mapping key -> (value, count) |
| | Returns: key -> mean |
| | """ |
| | all_name2valcount = comm.gather(local_name2valcount) |
| | if comm.rank == 0: |
| | name2sum = defaultdict(float) |
| | name2count = defaultdict(float) |
| | for n2vc in all_name2valcount: |
| | for (name, (val, count)) in n2vc.items(): |
| | try: |
| | val = float(val) |
| | except ValueError: |
| | if comm.rank == 0: |
| | warnings.warn('WARNING: tried to compute mean on non-float {}={}'.format(name, val)) |
| | else: |
| | name2sum[name] += val * count |
| | name2count[name] += count |
| | return {name : name2sum[name] / name2count[name] for name in name2sum} |
| | else: |
| | return {} |
| |
|
| |
|