|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from datetime import timedelta |
|
|
from typing import List |
|
|
|
|
|
|
|
|
def get_all(store, rank: int, prefix: str, size: int): |
|
|
r""" |
|
|
Given a store and a prefix, the method goes through the array of keys |
|
|
of the following format: ``{prefix}{idx}``, where idx is in a range |
|
|
from 0 to size, and tries to retrieve the data. |
|
|
|
|
|
The Rank0 process waits at the end to make sure all other processes |
|
|
finished the procedure before exiting. |
|
|
|
|
|
Usage |
|
|
|
|
|
:: |
|
|
|
|
|
values = get_all(store, 'torchelastic/data', 3) |
|
|
value1 = values[0] # retrieves the data for key torchelastic/data0 |
|
|
value2 = values[1] # retrieves the data for key torchelastic/data1 |
|
|
value3 = values[2] # retrieves the data for key torchelastic/data2 |
|
|
|
|
|
""" |
|
|
data_arr = [] |
|
|
for idx in range(size): |
|
|
data = store.get(f"{prefix}{idx}") |
|
|
data_arr.append(data) |
|
|
store.set(f"{prefix}{rank}.FIN", b"FIN") |
|
|
if rank == 0: |
|
|
|
|
|
|
|
|
|
|
|
for node_rank in range(size): |
|
|
store.get(f"{prefix}{node_rank}.FIN") |
|
|
|
|
|
return data_arr |
|
|
|
|
|
|
|
|
def synchronize( |
|
|
store, |
|
|
data: bytes, |
|
|
rank: int, |
|
|
world_size: int, |
|
|
key_prefix: str, |
|
|
barrier_timeout: float = 300, |
|
|
) -> List[bytes]: |
|
|
""" |
|
|
Synchronizes ``world_size`` agents between each other using the underlying c10d store. |
|
|
The ``data`` will be available on each of the agents. |
|
|
|
|
|
Note: The data on the path is not deleted, as a result there can be stale data if |
|
|
you use the same key_prefix twice. |
|
|
""" |
|
|
store.set_timeout(timedelta(seconds=barrier_timeout)) |
|
|
store.set(f"{key_prefix}{rank}", data) |
|
|
agent_data = get_all(store, rank, key_prefix, world_size) |
|
|
return agent_data |
|
|
|
|
|
|
|
|
def barrier( |
|
|
store, rank: int, world_size: int, key_prefix: str, barrier_timeout: float = 300 |
|
|
) -> None: |
|
|
""" |
|
|
A global lock between agents. |
|
|
|
|
|
Note: Since the data is not removed from the store, the barrier can be used |
|
|
once per unique ``key_prefix``. |
|
|
""" |
|
|
data = f"{rank}".encode(encoding="UTF-8") |
|
|
synchronize(store, data, rank, world_size, key_prefix, barrier_timeout) |
|
|
|