|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from tqdm import tqdm |
|
|
from multiprocessing.dummy import Pool as ThreadPool |
|
|
from multiprocessing import cpu_count |
|
|
|
|
|
|
|
|
def parallel_threads( |
|
|
function, |
|
|
args, |
|
|
workers=0, |
|
|
star_args=False, |
|
|
kw_args=False, |
|
|
front_num=1, |
|
|
Pool=ThreadPool, |
|
|
**tqdm_kw |
|
|
): |
|
|
"""tqdm but with parallel execution. |
|
|
|
|
|
Will essentially return |
|
|
res = [ function(arg) # default |
|
|
function(*arg) # if star_args is True |
|
|
function(**arg) # if kw_args is True |
|
|
for arg in args] |
|
|
|
|
|
Note: |
|
|
the <front_num> first elements of args will not be parallelized. |
|
|
This can be useful for debugging. |
|
|
""" |
|
|
while workers <= 0: |
|
|
workers += cpu_count() |
|
|
if workers == 1: |
|
|
front_num = float("inf") |
|
|
|
|
|
|
|
|
try: |
|
|
n_args_parallel = len(args) - front_num |
|
|
except TypeError: |
|
|
n_args_parallel = None |
|
|
args = iter(args) |
|
|
|
|
|
|
|
|
front = [] |
|
|
while len(front) < front_num: |
|
|
try: |
|
|
a = next(args) |
|
|
except StopIteration: |
|
|
return front |
|
|
front.append( |
|
|
function(*a) if star_args else function(**a) if kw_args else function(a) |
|
|
) |
|
|
|
|
|
|
|
|
out = [] |
|
|
with Pool(workers) as pool: |
|
|
|
|
|
if star_args: |
|
|
futures = pool.imap(starcall, [(function, a) for a in args]) |
|
|
elif kw_args: |
|
|
futures = pool.imap(starstarcall, [(function, a) for a in args]) |
|
|
else: |
|
|
futures = pool.imap(function, args) |
|
|
|
|
|
for f in tqdm(futures, total=n_args_parallel, **tqdm_kw): |
|
|
out.append(f) |
|
|
return front + out |
|
|
|
|
|
|
|
|
def parallel_processes(*args, **kwargs): |
|
|
"""Same as parallel_threads, with processes""" |
|
|
import multiprocessing as mp |
|
|
|
|
|
kwargs["Pool"] = mp.Pool |
|
|
return parallel_threads(*args, **kwargs) |
|
|
|
|
|
|
|
|
def starcall(args): |
|
|
"""convenient wrapper for Process.Pool""" |
|
|
function, args = args |
|
|
return function(*args) |
|
|
|
|
|
|
|
|
def starstarcall(args): |
|
|
"""convenient wrapper for Process.Pool""" |
|
|
function, args = args |
|
|
return function(**args) |
|
|
|