|
|
|
|
|
import warnings |
|
|
warnings.simplefilter('ignore') |
|
|
|
|
|
import argparse |
|
|
import sys, os |
|
|
import gc |
|
|
import shutil |
|
|
|
|
|
import multiprocessing |
|
|
from multiprocessing import Pool |
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
from scipy.stats import qmc |
|
|
import numpy as np |
|
|
import glob |
|
|
import h5py |
|
|
import fcntl |
|
|
import time |
|
|
from time import sleep |
|
|
from pathlib import Path |
|
|
import datetime |
|
|
|
|
|
|
|
|
try: |
|
|
from mpi4py import MPI |
|
|
comm = MPI.COMM_WORLD |
|
|
rank = comm.Get_rank() |
|
|
size = comm.Get_size() |
|
|
except ImportError: |
|
|
rank = 0 |
|
|
size = 1 |
|
|
|
|
|
str_pad_len = 80 |
|
|
str_pad_type = '-' |
|
|
|
|
|
class Generator(): |
|
|
def __init__(self, params_ranges, **kwargs): |
|
|
""" |
|
|
Generate dataset by 21cmFAST in parallel. |
|
|
Input: params_ranges = {'param1': [min, max], 'param2': [min, max], ...} |
|
|
Output: hdf5 storing images and params. |
|
|
""" |
|
|
self.import_py21cmfast() |
|
|
self.params_ranges = params_ranges.copy() |
|
|
|
|
|
self.define_kwargs(kwargs) |
|
|
|
|
|
|
|
|
if rank == 0: |
|
|
self.print_kwargs_params() |
|
|
|
|
|
def import_py21cmfast(self): |
|
|
|
|
|
|
|
|
self.default_cache_direc = None |
|
|
global p21c |
|
|
|
|
|
if rank == 0: |
|
|
import py21cmfast as p21c |
|
|
self.default_cache_direc = os.path.join(Path.home(),"21cmFAST-cache") |
|
|
|
|
|
|
|
|
|
|
|
os.makedirs(os.path.join(self.default_cache_direc,'wisdoms'), exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
if 'comm' in globals(): |
|
|
|
|
|
self.default_cache_direc = comm.bcast(self.default_cache_direc, root=0) |
|
|
|
|
|
import py21cmfast as p21c |
|
|
|
|
|
@property |
|
|
def params_ranges(self): |
|
|
if not hasattr(self, '_params_ranges'): |
|
|
self._params_ranges = "Error." |
|
|
return self._params_ranges |
|
|
|
|
|
@params_ranges.setter |
|
|
def params_ranges(self, value): |
|
|
self._params_ranges = value |
|
|
for key, value in self._params_ranges.items(): |
|
|
if type(value) != list: |
|
|
self._params_ranges[key] = [value] |
|
|
|
|
|
def print_kwargs_params(self): |
|
|
if self.kwargs['verbose'] >= 1: |
|
|
print(f" Mission: Generate {self.kwargs['num_images']} images by {size}*{self.kwargs['cpus_per_node']} CPUs ".center(str_pad_len, '#')) |
|
|
print(f" params: ".center(int(str_pad_len/2),str_pad_type)+f" ranges: ".center(int(str_pad_len/2),str_pad_type)) |
|
|
for key in self.params_ranges: |
|
|
print(f"{key}".center(int(str_pad_len/2))+f"[{self.params_ranges[key][0]}, {self.params_ranges[key][-1]}]".center(int(str_pad_len/2))) |
|
|
|
|
|
if self.kwargs['verbose'] >= 2: |
|
|
print(f" kwargs: ".center(int(str_pad_len/2), str_pad_type)+f" values: ".center(int(str_pad_len/2),str_pad_type)) |
|
|
|
|
|
for key in self.kwargs: |
|
|
print(f"{key}".center(int(str_pad_len/2))+f"{self.kwargs[key]}".center(int(str_pad_len/2))) |
|
|
|
|
|
|
|
|
def define_kwargs(self, kwargs): |
|
|
self.kwargs = dict( |
|
|
|
|
|
p21c_run = 'lightcone', |
|
|
num_images = 9, |
|
|
fields = ['brightness_temp',], |
|
|
verbose = 2, |
|
|
seed = None, |
|
|
save_direc_name = "21cmDataset.h5", |
|
|
|
|
|
|
|
|
|
|
|
strength = 1, |
|
|
|
|
|
|
|
|
redshift = [8,10], |
|
|
|
|
|
|
|
|
|
|
|
BOX_LEN = 150, |
|
|
HII_DIM = 60, |
|
|
USE_INTERPOLATION_TABLES = True, |
|
|
USE_TS_FLUCT = True, |
|
|
|
|
|
|
|
|
SIGMA_8 = 0.810, |
|
|
hlittle = 0.677, |
|
|
OMm = 0.310, |
|
|
OMb = 0.0490, |
|
|
POWER_INDEX = 0.967, |
|
|
|
|
|
|
|
|
cpus_per_node = len(os.sched_getaffinity(0)), |
|
|
cache_rmdir = True, |
|
|
) |
|
|
|
|
|
|
|
|
self.kwargs = self.kwargs | kwargs |
|
|
|
|
|
if type(self.kwargs['redshift']) != list: |
|
|
self.kwargs['redshift'] = [self.kwargs['redshift']] |
|
|
|
|
|
if type(self.kwargs['fields']) != list: |
|
|
self.kwargs['fields'] = [self.kwargs['fields']] |
|
|
|
|
|
if self.kwargs['num_images'] < size: |
|
|
if self.kwargs['verbose'] > 0: print(f"num_images {self.kwargs['num_images']} must be >= the number of nodes {size}.") |
|
|
self.kwargs['num_images'] = size |
|
|
|
|
|
if 'cache_direc' not in self.kwargs: |
|
|
self.kwargs['cache_direc'] = os.path.join( |
|
|
os.path.dirname(self.kwargs['save_direc_name']), |
|
|
'_cache', str(rank), |
|
|
) |
|
|
|
|
|
if not os.path.exists(self.kwargs['cache_direc']) and self.kwargs['write']: |
|
|
os.makedirs(self.kwargs['cache_direc']) |
|
|
p21c.config['direc'] = self.kwargs['cache_direc'] |
|
|
|
|
|
if 'write' not in self.kwargs: |
|
|
self.kwargs['write'] = self.kwargs['seed'] != None |
|
|
|
|
|
def sample_normalized_params(self): |
|
|
""" |
|
|
sample and scatter to other nodes |
|
|
""" |
|
|
np.random.seed(self.kwargs['seed']) |
|
|
if rank == 0: |
|
|
sampler = qmc.LatinHypercube(d=len(self.params_ranges), strength=self.kwargs['strength'], seed=np.random.default_rng(self.kwargs['seed'])) |
|
|
sample = sampler.random(n=self.kwargs['num_images']) |
|
|
send_data = np.array_split(sample, size, axis=0) |
|
|
|
|
|
if self.kwargs['verbose'] >= 2: |
|
|
print(f" Process {rank} scatters data {sample.shape} to {size} nodes ".center(str_pad_len,str_pad_type)) |
|
|
else: |
|
|
send_data = None |
|
|
|
|
|
if 'comm' in globals(): |
|
|
recv_data = comm.scatter(send_data, root=0) |
|
|
else: |
|
|
recv_data = send_data |
|
|
|
|
|
return recv_data |
|
|
|
|
|
|
|
|
def denormalize(self, normalized_data): |
|
|
""" |
|
|
denormalize data received, and return self.params_node which stores params for each node. |
|
|
""" |
|
|
self.params_node = {} |
|
|
for i, kind in enumerate(self.params_ranges): |
|
|
x = normalized_data.T[i] |
|
|
k = self.params_ranges[kind][-1]-self.params_ranges[kind][0] |
|
|
b = self.params_ranges[kind][0] |
|
|
self.params_node[kind] = k*x + b |
|
|
|
|
|
|
|
|
def return_coeval_or_lightcone(self, kwargs_params_cpu, random_seed): |
|
|
if self.kwargs['p21c_run'] == 'coeval': |
|
|
coevals_cpu = p21c.run_coeval( |
|
|
redshift = kwargs_params_cpu['redshift'], |
|
|
user_params = kwargs_params_cpu, |
|
|
cosmo_params = p21c.CosmoParams(kwargs_params_cpu), |
|
|
astro_params = p21c.AstroParams(kwargs_params_cpu), |
|
|
flag_options = p21c.FlagOptions(kwargs_params_cpu), |
|
|
random_seed = random_seed, |
|
|
write = kwargs_params_cpu['write'], |
|
|
) |
|
|
dict_cpu = self.coevals2dict(coevals_cpu) |
|
|
del coevals_cpu |
|
|
|
|
|
elif self.kwargs['p21c_run'] == 'lightcone': |
|
|
lightcone_cpu = p21c.run_lightcone( |
|
|
redshift = kwargs_params_cpu['redshift'][0], |
|
|
|
|
|
z_heat_max = kwargs_params_cpu['redshift'][-1], |
|
|
lightcone_quantities = kwargs_params_cpu['fields'], |
|
|
user_params = kwargs_params_cpu, |
|
|
cosmo_params = p21c.CosmoParams(kwargs_params_cpu), |
|
|
astro_params = p21c.AstroParams(kwargs_params_cpu), |
|
|
flag_options = p21c.FlagOptions(kwargs_params_cpu), |
|
|
random_seed = random_seed, |
|
|
write = kwargs_params_cpu['write'], |
|
|
) |
|
|
|
|
|
|
|
|
dict_cpu = self.lightcone2dict(lightcone_cpu) |
|
|
del lightcone_cpu |
|
|
|
|
|
gc.collect() |
|
|
|
|
|
return dict_cpu |
|
|
|
|
|
|
|
|
def pool_run(self, params_node_value): |
|
|
|
|
|
pool_run_start = time.perf_counter() |
|
|
|
|
|
pid_cpu = multiprocessing.current_process().pid |
|
|
|
|
|
random_seed = int(params_node_value[-1]) |
|
|
params_cpu = {key: params_node_value[i] for (i, key) in enumerate(self.params_node.keys())} |
|
|
|
|
|
|
|
|
|
|
|
kwargs_params_cpu = self.kwargs | params_cpu |
|
|
|
|
|
|
|
|
dict_cpu = self.return_coeval_or_lightcone(kwargs_params_cpu,random_seed) |
|
|
|
|
|
|
|
|
cache_pattern = os.path.join(self.kwargs['cache_direc'], f"*r{random_seed}.h5") |
|
|
for filename in glob.glob(cache_pattern): |
|
|
os.remove(filename) |
|
|
|
|
|
pool_run_end = time.perf_counter() |
|
|
|
|
|
time_elapsed = time.strftime("%H:%M:%S", time.gmtime(pool_run_end - pool_run_start)) |
|
|
|
|
|
async_save_time = self.async_save(dict_cpu, np.expand_dims(params_node_value, axis=0)) |
|
|
|
|
|
if self.kwargs['verbose'] > 2: |
|
|
print(f'cpu {pid_cpu}-{rank}, {time_elapsed}, {async_save_time}, params {list(params_cpu.values())}, seed {random_seed}') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def lightcone2dict(self, lightcone_cpu): |
|
|
images_cpu = {} |
|
|
for i, field in enumerate(self.kwargs['fields']): |
|
|
images_cpu[field] = np.expand_dims(lightcone_cpu.lightcones[field], axis=0) |
|
|
|
|
|
images_cpu["redshifts_distances"] = np.vstack((lightcone_cpu.lightcone_redshifts, lightcone_cpu.lightcone_distances)) |
|
|
|
|
|
return images_cpu |
|
|
|
|
|
|
|
|
def coevals2dict(self, coevals_cpu): |
|
|
images_cpu = {} |
|
|
for i, field in enumerate(self.kwargs['fields']): |
|
|
images_cpu[field] = [] |
|
|
for j, coeval in enumerate(coevals_cpu): |
|
|
images_cpu[field].append(coeval.__dict__[field]) |
|
|
return images_cpu |
|
|
|
|
|
def cache_rmdir(self): |
|
|
|
|
|
if os.path.exists(self.kwargs['cache_direc']) and len(os.listdir(self.kwargs['cache_direc'])) == 0: |
|
|
os.rmdir(self.kwargs['cache_direc']) |
|
|
|
|
|
if 'comm' in globals(): |
|
|
|
|
|
recv_data = comm.gather(rank, root=0) |
|
|
|
|
|
|
|
|
if rank == 0: |
|
|
if os.path.exists(os.path.dirname(self.kwargs['cache_direc'])) and len(os.listdir(os.path.dirname(self.kwargs['cache_direc']))) == 0: |
|
|
os.rmdir(os.path.dirname(self.kwargs['cache_direc'])) |
|
|
if os.path.exists(self.default_cache_direc) and len(os.listdir(self.default_cache_direc)) == 1: |
|
|
|
|
|
shutil.rmtree(self.default_cache_direc) |
|
|
|
|
|
|
|
|
|
|
|
def run(self): |
|
|
|
|
|
normalized_params = self.sample_normalized_params() |
|
|
self.denormalize(normalized_params) |
|
|
|
|
|
pid_node = os.getpid() |
|
|
|
|
|
cpus_per_node = self.kwargs['cpus_per_node'] |
|
|
|
|
|
if self.kwargs['verbose'] >= 3: |
|
|
print(f" node {rank}: {cpus_per_node} CPUs, params.shape {np.array(list(self.params_node.values())).T.shape} ".center(str_pad_len,str_pad_type)) |
|
|
|
|
|
iterables = np.array(list(self.params_node.values())) |
|
|
random_seeds = np.random.randint(1,2**63, size = iterables.shape[-1]) |
|
|
iterables = np.vstack((iterables, random_seeds)).T |
|
|
|
|
|
|
|
|
loop_num = np.ceil(iterables.shape[0]/cpus_per_node) |
|
|
for iterable in np.array_split(iterables, loop_num, axis=0): |
|
|
with Pool(cpus_per_node) as p: |
|
|
Pool_start = time.perf_counter() |
|
|
|
|
|
p.map(self.pool_run, iterable) |
|
|
|
|
|
Pool_end = time.perf_counter() |
|
|
time_elapsed = time.strftime("%H:%M:%S", time.gmtime(Pool_end - Pool_start)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.kwargs['verbose'] >= 2 and False: |
|
|
print(f"{time_elapsed}, node {rank}: {images_node_MB} MB images {[np.shape(images_node[field]) for field in self.kwargs['fields']]} ->{async_save_time}-> {os.path.basename(self.kwargs['save_direc_name'])}") |
|
|
|
|
|
if self.kwargs['cache_rmdir'] == True: |
|
|
self.cache_rmdir() |
|
|
|
|
|
def dict2images(self, dict_node): |
|
|
images_node = {} |
|
|
images_node_MB = [] |
|
|
for field in self.kwargs['fields']: |
|
|
images_node[field] = [] |
|
|
for dict_cpu in dict_node: |
|
|
images_node[field].append(dict_cpu[field]) |
|
|
images_node[field] = np.array(images_node[field]) |
|
|
images_node_MB.append(round(images_node[field].nbytes / 1024**2)) |
|
|
|
|
|
if 'redshifts_distances' in dict_cpu: |
|
|
images_node['redshifts_distances'] = dict_cpu['redshifts_distances'] |
|
|
|
|
|
return images_node, images_node_MB |
|
|
|
|
|
def async_save(self, images_node, params_seeds): |
|
|
try_start = time.perf_counter() |
|
|
while True: |
|
|
try: |
|
|
save_start = time.perf_counter() |
|
|
try_time = save_start - try_start |
|
|
self.save(images_node, params_seeds) |
|
|
save_end = time.perf_counter() |
|
|
save_time = save_end - save_start |
|
|
return f"{try_time:.1f}s/{save_time:.2f}s" |
|
|
|
|
|
except IOError or BlockingIOError: |
|
|
if try_time > 30: |
|
|
print(f"cpu {multiprocessing.current_process().pid}-{rank}, try_time = {try_time:.2f} sec") |
|
|
sleep(5) |
|
|
else: |
|
|
sleep(0.1) |
|
|
|
|
|
|
|
|
def save(self, images_node, params_seeds): |
|
|
|
|
|
max_num_images = self.kwargs['num_images'] |
|
|
|
|
|
with h5py.File(self.kwargs['save_direc_name'], 'a') as f: |
|
|
if 'kwargs' not in f.keys(): |
|
|
keys = list(self.kwargs) |
|
|
values = [str(value) for value in self.kwargs.values()] |
|
|
data = np.transpose(list((keys, values))) |
|
|
data = data.tolist() |
|
|
f.create_dataset('kwargs', data=data) |
|
|
|
|
|
if 'params' not in f.keys(): |
|
|
grp = f.create_group('params') |
|
|
grp['keys'] = list(self.params_ranges) |
|
|
grp.create_dataset( |
|
|
'values', |
|
|
data = params_seeds[:,:-1], |
|
|
maxshape = tuple((max_num_images,) + params_seeds[:,:-1].shape[1:]), |
|
|
) |
|
|
else: |
|
|
new_size = f['params']['values'].shape[0] + params_seeds.shape[0] |
|
|
f['params']['values'].resize(new_size, axis=0) |
|
|
f['params']['values'][-params_seeds.shape[0]:] = params_seeds[:,:-1] |
|
|
|
|
|
|
|
|
|
|
|
seeds = params_seeds[:,-1] |
|
|
if 'seeds' not in f.keys(): |
|
|
|
|
|
|
|
|
f.create_dataset( |
|
|
'seeds', |
|
|
data = seeds.astype(np.int64), |
|
|
|
|
|
maxshape = (max_num_images,), |
|
|
) |
|
|
else: |
|
|
new_size = f['seeds'].shape[0] + seeds.shape[0] |
|
|
f['seeds'].resize(new_size, axis=0) |
|
|
f['seeds'][-seeds.shape[0]:] = seeds.astype(np.int64) |
|
|
|
|
|
|
|
|
if 'redshifts_distances' not in f.keys() and 'redshifts_distances' in images_node: |
|
|
f.create_dataset('redshifts_distances', data=images_node['redshifts_distances']) |
|
|
|
|
|
for field in self.kwargs['fields']: |
|
|
images = images_node[field] |
|
|
if field not in f.keys(): |
|
|
f.create_dataset( |
|
|
field, |
|
|
data=images, |
|
|
maxshape= tuple((max_num_images,) + images.shape[1:]) |
|
|
) |
|
|
else: |
|
|
new_size = f[field].shape[0] + images.shape[0] |
|
|
f[field].resize(new_size, axis=0) |
|
|
f[field][-images.shape[0]:] = images |
|
|
|
|
|
if __name__ == '__main__': |
|
|
parser = argparse.ArgumentParser(description="generating 21cm dataset") |
|
|
parser.add_argument('--num_images', type=int, default=4) |
|
|
parser.add_argument('--BOX_LEN', type=int, default=64) |
|
|
parser.add_argument('--HII_DIM', type=int, default=128) |
|
|
parser.add_argument('--NON_CUBIC_FACTOR', type=int, default=1) |
|
|
parser.add_argument('--cpus_per_node', type=int, default=len(os.sched_getaffinity(0))) |
|
|
parser.add_argument('--save_direc', type=str, default='.') |
|
|
args = parser.parse_args() |
|
|
|
|
|
params_ranges = dict( |
|
|
ION_Tvir_MIN = 4.8, |
|
|
HII_EFF_FACTOR = 131.341, |
|
|
) |
|
|
|
|
|
kwargs = dict( |
|
|
num_images=args.num_images, |
|
|
fields = ['brightness_temp', 'density', 'xH_box'], |
|
|
BOX_LEN = args.BOX_LEN, |
|
|
HII_DIM = args.HII_DIM, |
|
|
verbose = 3, |
|
|
redshift = [7.51, 21.02], |
|
|
NON_CUBIC_FACTOR = args.NON_CUBIC_FACTOR, |
|
|
write = True, |
|
|
cpus_per_node = args.cpus_per_node, |
|
|
cache_rmdir = False, |
|
|
) |
|
|
|
|
|
now = datetime.datetime.now().strftime("%m%d-%H%M%S") |
|
|
save_name = f"LEN{kwargs['BOX_LEN']}-DIM{kwargs['HII_DIM']}-CUB{kwargs['NON_CUBIC_FACTOR']}-Tvir{params_ranges['ION_Tvir_MIN']}-zeta{params_ranges['HII_EFF_FACTOR']}-{now}.h5" |
|
|
kwargs['save_direc_name'] = os.path.join(args.save_direc, save_name) |
|
|
|
|
|
generator = Generator(params_ranges, **kwargs) |
|
|
generator.run() |
|
|
|
|
|
print(f"rank {rank} completed!") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|