| from __future__ import absolute_import, division, print_function |
|
|
| import os |
| import warnings |
| from random import random |
| from time import sleep |
| from uuid import uuid4 |
|
|
| import pytest |
|
|
| from .. import Parallel, delayed, parallel_backend, parallel_config |
| from .._dask import DaskDistributedBackend |
| from ..parallel import AutoBatchingMixin, ThreadingBackend |
| from .common import np, with_numpy |
| from .test_parallel import ( |
| _recursive_backend_info, |
| _test_deadlock_with_generator, |
| _test_parallel_unordered_generator_returns_fastest_first, |
| ) |
|
|
| distributed = pytest.importorskip("distributed") |
| dask = pytest.importorskip("dask") |
|
|
| |
| from distributed import Client, LocalCluster, get_client |
| from distributed.metrics import time |
|
|
| |
| |
| from distributed.utils_test import cleanup, cluster, inc |
|
|
|
|
| @pytest.fixture(scope="function", autouse=True) |
| def avoid_dask_env_leaks(tmp_path): |
| |
| |
|
|
| from joblib._parallel_backends import ParallelBackendBase |
|
|
| old_value = {k: os.environ.get(k) for k in ParallelBackendBase.MAX_NUM_THREADS_VARS} |
| yield |
|
|
| |
| for k, v in old_value.items(): |
| if v is None: |
| os.environ.pop(k, None) |
| else: |
| os.environ[k] = v |
|
|
|
|
| def noop(*args, **kwargs): |
| pass |
|
|
|
|
| def slow_raise_value_error(condition, duration=0.05): |
| sleep(duration) |
| if condition: |
| raise ValueError("condition evaluated to True") |
|
|
|
|
| def count_events(event_name, client): |
| worker_events = client.run(lambda dask_worker: dask_worker.log) |
| event_counts = {} |
| for w, events in worker_events.items(): |
| event_counts[w] = len( |
| [event for event in list(events) if event[1] == event_name] |
| ) |
| return event_counts |
|
|
|
|
| def test_simple(loop): |
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop) as client: |
| with parallel_config(backend="dask"): |
| seq = Parallel()(delayed(inc)(i) for i in range(10)) |
| assert seq == [inc(i) for i in range(10)] |
|
|
| with pytest.raises(ValueError): |
| Parallel()( |
| delayed(slow_raise_value_error)(i == 3) for i in range(10) |
| ) |
|
|
| seq = Parallel()(delayed(inc)(i) for i in range(10)) |
| assert seq == [inc(i) for i in range(10)] |
|
|
|
|
| def test_dask_backend_uses_autobatching(loop): |
| assert ( |
| DaskDistributedBackend.compute_batch_size |
| is AutoBatchingMixin.compute_batch_size |
| ) |
|
|
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop) as client: |
| with parallel_config(backend="dask"): |
| with Parallel() as parallel: |
| |
| |
| backend = parallel._backend |
| assert isinstance(backend, DaskDistributedBackend) |
| assert backend.parallel is parallel |
| assert backend._effective_batch_size == 1 |
|
|
| |
| |
| parallel(delayed(lambda: None)() for _ in range(int(1e4))) |
| assert backend._effective_batch_size > 10 |
|
|
|
|
| @pytest.mark.parametrize("n_jobs", [2, -1]) |
| @pytest.mark.parametrize("context", [parallel_config, parallel_backend]) |
| def test_parallel_unordered_generator_returns_fastest_first_with_dask(n_jobs, context): |
| with distributed.Client(n_workers=2, threads_per_worker=2), context("dask"): |
| _test_parallel_unordered_generator_returns_fastest_first(None, n_jobs) |
|
|
|
|
| @with_numpy |
| @pytest.mark.parametrize("n_jobs", [2, -1]) |
| @pytest.mark.parametrize("return_as", ["generator", "generator_unordered"]) |
| @pytest.mark.parametrize("context", [parallel_config, parallel_backend]) |
| def test_deadlock_with_generator_and_dask(context, return_as, n_jobs): |
| with distributed.Client(n_workers=2, threads_per_worker=2), context("dask"): |
| _test_deadlock_with_generator(None, return_as, n_jobs) |
|
|
|
|
| @with_numpy |
| @pytest.mark.parametrize("context", [parallel_config, parallel_backend]) |
| def test_nested_parallelism_with_dask(context): |
| with distributed.Client(n_workers=2, threads_per_worker=2): |
| |
| data = np.ones(int(1e7), dtype=np.uint8) |
| for i in range(2): |
| with context("dask"): |
| backend_types_and_levels = _recursive_backend_info(data=data) |
| assert len(backend_types_and_levels) == 4 |
| assert all( |
| name == "DaskDistributedBackend" for name, _ in backend_types_and_levels |
| ) |
|
|
| |
| with context("dask"): |
| backend_types_and_levels = _recursive_backend_info() |
| assert len(backend_types_and_levels) == 4 |
| assert all( |
| name == "DaskDistributedBackend" for name, _ in backend_types_and_levels |
| ) |
|
|
|
|
| def random2(): |
| return random() |
|
|
|
|
| def test_dont_assume_function_purity(loop): |
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop) as client: |
| with parallel_config(backend="dask"): |
| x, y = Parallel()(delayed(random2)() for i in range(2)) |
| assert x != y |
|
|
|
|
| @pytest.mark.parametrize("mixed", [True, False]) |
| def test_dask_funcname(loop, mixed): |
| from joblib._dask import Batch |
|
|
| if not mixed: |
| tasks = [delayed(inc)(i) for i in range(4)] |
| batch_repr = "batch_of_inc_4_calls" |
| else: |
| tasks = [delayed(abs)(i) if i % 2 else delayed(inc)(i) for i in range(4)] |
| batch_repr = "mixed_batch_of_inc_4_calls" |
|
|
| assert repr(Batch(tasks)) == batch_repr |
|
|
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop) as client: |
| with parallel_config(backend="dask"): |
| _ = Parallel(batch_size=2, pre_dispatch="all")(tasks) |
|
|
| def f(dask_scheduler): |
| return list(dask_scheduler.transition_log) |
|
|
| batch_repr = batch_repr.replace("4", "2") |
| log = client.run_on_scheduler(f) |
| assert all("batch_of_inc" in tup[0] for tup in log) |
|
|
|
|
| def test_no_undesired_distributed_cache_hit(): |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| lists = [[] for _ in range(100)] |
| np = pytest.importorskip("numpy") |
| X = np.arange(int(1e6)) |
|
|
| def isolated_operation(list_, data=None): |
| if data is not None: |
| np.testing.assert_array_equal(data, X) |
| list_.append(uuid4().hex) |
| return list_ |
|
|
| cluster = LocalCluster(n_workers=1, threads_per_worker=2) |
| client = Client(cluster) |
| try: |
| with parallel_config(backend="dask"): |
| |
| res = Parallel()(delayed(isolated_operation)(list_) for list_ in lists) |
|
|
| |
| |
| assert lists == [[] for _ in range(100)] |
|
|
| |
| |
| |
| counts = count_events("receive-from-scatter", client) |
| assert sum(counts.values()) == 0 |
| assert all([len(r) == 1 for r in res]) |
|
|
| with parallel_config(backend="dask"): |
| |
| |
| res = Parallel()( |
| delayed(isolated_operation)(list_, data=X) for list_ in lists |
| ) |
|
|
| |
| counts = count_events("receive-from-scatter", client) |
| assert sum(counts.values()) > 0 |
| assert all([len(r) == 1 for r in res]) |
| finally: |
| client.close(timeout=30) |
| cluster.close(timeout=30) |
|
|
|
|
| class CountSerialized(object): |
| def __init__(self, x): |
| self.x = x |
| self.count = 0 |
|
|
| def __add__(self, other): |
| return self.x + getattr(other, "x", other) |
|
|
| __radd__ = __add__ |
|
|
| def __reduce__(self): |
| self.count += 1 |
| return (CountSerialized, (self.x,)) |
|
|
|
|
| def add5(a, b, c, d=0, e=0): |
| return a + b + c + d + e |
|
|
|
|
| def test_manual_scatter(loop): |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| w, x, y, z = (CountSerialized(i) for i in range(4)) |
|
|
| f = delayed(add5) |
| tasks = [f(x, y, z, d=4, e=5) for _ in range(10)] |
| tasks += [ |
| f(x, z, y, d=5, e=4), |
| f(y, x, z, d=x, e=5), |
| f(z, z, x, d=z, e=y), |
| ] |
| expected = [func(*args, **kwargs) for func, args, kwargs in tasks] |
|
|
| with cluster() as (s, _): |
| with Client(s["address"], loop=loop) as client: |
| with parallel_config(backend="dask", scatter=[w, x, y]): |
| results_parallel = Parallel(batch_size=1)(tasks) |
| assert results_parallel == expected |
|
|
| |
| |
| with pytest.raises(TypeError): |
| with parallel_config(backend="dask", loop=loop, scatter=1): |
| pass |
|
|
| |
| |
| |
| n_serialization_scatter_with_parallel = w.count |
| assert x.count == n_serialization_scatter_with_parallel |
| assert y.count == n_serialization_scatter_with_parallel |
| n_serialization_with_parallel = z.count |
|
|
| |
| for var in (w, x, y, z): |
| var.count = 0 |
|
|
| with cluster() as (s, _): |
| with Client(s["address"], loop=loop) as client: |
| scattered = dict() |
| for obj in w, x, y: |
| scattered[id(obj)] = client.scatter(obj, broadcast=True) |
| results_native = [ |
| client.submit( |
| func, |
| *(scattered.get(id(arg), arg) for arg in args), |
| **dict( |
| (key, scattered.get(id(value), value)) |
| for (key, value) in kwargs.items() |
| ), |
| key=str(uuid4()), |
| ).result() |
| for (func, args, kwargs) in tasks |
| ] |
| assert results_native == expected |
|
|
| |
| |
| n_serialization_scatter_native = w.count |
| assert x.count == n_serialization_scatter_native |
| assert y.count == n_serialization_scatter_native |
|
|
| assert n_serialization_scatter_with_parallel == n_serialization_scatter_native |
|
|
| distributed_version = tuple(int(v) for v in distributed.__version__.split(".")) |
| if distributed_version < (2023, 4): |
| |
| |
| |
| |
| assert z.count == n_serialization_with_parallel + 1 |
| else: |
| assert z.count == n_serialization_with_parallel |
|
|
|
|
| |
| |
| |
| def test_auto_scatter(loop_in_thread): |
| np = pytest.importorskip("numpy") |
| data1 = np.ones(int(1e4), dtype=np.uint8) |
| data2 = np.ones(int(1e4), dtype=np.uint8) |
| data_to_process = ([data1] * 3) + ([data2] * 3) |
|
|
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop_in_thread) as client: |
| with parallel_config(backend="dask"): |
| |
| |
| Parallel()( |
| delayed(noop)(data, data, i, opt=data) |
| for i, data in enumerate(data_to_process) |
| ) |
| |
| |
| |
| counts = count_events("receive-from-scatter", client) |
| assert counts[a["address"]] + counts[b["address"]] == 2 |
|
|
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop_in_thread) as client: |
| with parallel_config(backend="dask"): |
| Parallel()(delayed(noop)(data1[:3], i) for i in range(5)) |
| |
| |
| counts = count_events("receive-from-scatter", client) |
| assert counts[a["address"]] == 0 |
| assert counts[b["address"]] == 0 |
|
|
|
|
| @pytest.mark.parametrize("retry_no", list(range(2))) |
| def test_nested_scatter(loop, retry_no): |
| np = pytest.importorskip("numpy") |
|
|
| NUM_INNER_TASKS = 10 |
| NUM_OUTER_TASKS = 10 |
|
|
| def my_sum(x, i, j): |
| return np.sum(x) |
|
|
| def outer_function_joblib(array, i): |
| client = get_client() |
| with parallel_config(backend="dask"): |
| results = Parallel()( |
| delayed(my_sum)(array[j:], i, j) for j in range(NUM_INNER_TASKS) |
| ) |
| return sum(results) |
|
|
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop) as _: |
| with parallel_config(backend="dask"): |
| my_array = np.ones(10000) |
| _ = Parallel()( |
| delayed(outer_function_joblib)(my_array[i:], i) |
| for i in range(NUM_OUTER_TASKS) |
| ) |
|
|
|
|
| def test_nested_backend_context_manager(loop_in_thread): |
| def get_nested_pids(): |
| pids = set(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2))) |
| pids |= set(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2))) |
| return pids |
|
|
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop_in_thread) as client: |
| with parallel_config(backend="dask"): |
| pid_groups = Parallel(n_jobs=2)( |
| delayed(get_nested_pids)() for _ in range(10) |
| ) |
| for pid_group in pid_groups: |
| assert len(set(pid_group)) <= 2 |
|
|
| |
| with Client(s["address"], loop=loop_in_thread) as client: |
| with parallel_config(backend="dask"): |
| pid_groups = Parallel(n_jobs=2)( |
| delayed(get_nested_pids)() for _ in range(10) |
| ) |
| for pid_group in pid_groups: |
| assert len(set(pid_group)) <= 2 |
|
|
|
|
| def test_nested_backend_context_manager_implicit_n_jobs(loop): |
| |
| |
|
|
| def _backend_type(p): |
| return p._backend.__class__.__name__ |
|
|
| def get_nested_implicit_n_jobs(): |
| with Parallel() as p: |
| return _backend_type(p), p.n_jobs |
|
|
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop) as client: |
| with parallel_config(backend="dask"): |
| with Parallel() as p: |
| assert _backend_type(p) == "DaskDistributedBackend" |
| assert p.n_jobs == -1 |
| all_nested_n_jobs = p( |
| delayed(get_nested_implicit_n_jobs)() for _ in range(2) |
| ) |
| for backend_type, nested_n_jobs in all_nested_n_jobs: |
| assert backend_type == "DaskDistributedBackend" |
| assert nested_n_jobs == -1 |
|
|
|
|
| def test_errors(loop): |
| with pytest.raises(ValueError) as info: |
| with parallel_config(backend="dask"): |
| pass |
|
|
| assert "create a dask client" in str(info.value).lower() |
|
|
|
|
| def test_correct_nested_backend(loop): |
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop) as client: |
| |
| with parallel_config(backend="dask"): |
| result = Parallel(n_jobs=2)( |
| delayed(outer)(nested_require=None) for _ in range(1) |
| ) |
| assert isinstance(result[0][0][0], DaskDistributedBackend) |
|
|
| |
| with parallel_config(backend="dask"): |
| result = Parallel(n_jobs=2)( |
| delayed(outer)(nested_require="sharedmem") for _ in range(1) |
| ) |
| assert isinstance(result[0][0][0], ThreadingBackend) |
|
|
|
|
| def outer(nested_require): |
| return Parallel(n_jobs=2, prefer="threads")( |
| delayed(middle)(nested_require) for _ in range(1) |
| ) |
|
|
|
|
| def middle(require): |
| return Parallel(n_jobs=2, require=require)(delayed(inner)() for _ in range(1)) |
|
|
|
|
| def inner(): |
| return Parallel()._backend |
|
|
|
|
| def test_secede_with_no_processes(loop): |
| |
| with Client(loop=loop, processes=False, set_as_default=True): |
| with parallel_config(backend="dask"): |
| Parallel(n_jobs=4)(delayed(id)(i) for i in range(2)) |
|
|
|
|
| def _worker_address(_): |
| from distributed import get_worker |
|
|
| return get_worker().address |
|
|
|
|
| def test_dask_backend_keywords(loop): |
| with cluster() as (s, [a, b]): |
| with Client(s["address"], loop=loop) as client: |
| with parallel_config(backend="dask", workers=a["address"]): |
| seq = Parallel()(delayed(_worker_address)(i) for i in range(10)) |
| assert seq == [a["address"]] * 10 |
|
|
| with parallel_config(backend="dask", workers=b["address"]): |
| seq = Parallel()(delayed(_worker_address)(i) for i in range(10)) |
| assert seq == [b["address"]] * 10 |
|
|
|
|
| def test_scheduler_tasks_cleanup(loop): |
| with Client(processes=False, loop=loop) as client: |
| with parallel_config(backend="dask"): |
| Parallel()(delayed(inc)(i) for i in range(10)) |
|
|
| start = time() |
| while client.cluster.scheduler.tasks: |
| sleep(0.01) |
| assert time() < start + 5 |
|
|
| assert not client.futures |
|
|
|
|
| @pytest.mark.parametrize("cluster_strategy", ["adaptive", "late_scaling"]) |
| @pytest.mark.skipif( |
| distributed.__version__ <= "2.1.1" and distributed.__version__ >= "1.28.0", |
| reason="distributed bug - https://github.com/dask/distributed/pull/2841", |
| ) |
| def test_wait_for_workers(cluster_strategy): |
| cluster = LocalCluster(n_workers=0, processes=False, threads_per_worker=2) |
| client = Client(cluster) |
| if cluster_strategy == "adaptive": |
| cluster.adapt(minimum=0, maximum=2) |
| elif cluster_strategy == "late_scaling": |
| |
| |
| |
| |
| cluster.scale(2) |
| try: |
| with parallel_config(backend="dask"): |
| |
| |
| Parallel()(delayed(inc)(i) for i in range(10)) |
| finally: |
| client.close() |
| cluster.close() |
|
|
|
|
| def test_wait_for_workers_timeout(): |
| |
| cluster = LocalCluster(n_workers=0, processes=False, threads_per_worker=2) |
| client = Client(cluster) |
| try: |
| with parallel_config(backend="dask", wait_for_workers_timeout=0.1): |
| |
| msg = "DaskDistributedBackend has no worker after 0.1 seconds." |
| with pytest.raises(TimeoutError, match=msg): |
| Parallel()(delayed(inc)(i) for i in range(10)) |
|
|
| with parallel_config(backend="dask", wait_for_workers_timeout=0): |
| |
| msg = "DaskDistributedBackend has no active worker" |
| with pytest.raises(RuntimeError, match=msg): |
| Parallel()(delayed(inc)(i) for i in range(10)) |
| finally: |
| client.close() |
| cluster.close() |
|
|
|
|
| @pytest.mark.parametrize("backend", ["loky", "multiprocessing"]) |
| def test_joblib_warning_inside_dask_daemonic_worker(backend): |
| cluster = LocalCluster(n_workers=2) |
| client = Client(cluster) |
| try: |
|
|
| def func_using_joblib_parallel(): |
| |
| |
| |
| |
| with warnings.catch_warnings(record=True) as record: |
| Parallel(n_jobs=2, backend=backend)(delayed(inc)(i) for i in range(10)) |
|
|
| return record |
|
|
| fut = client.submit(func_using_joblib_parallel) |
| record = fut.result() |
|
|
| assert len(record) == 1 |
| warning = record[0].message |
| assert isinstance(warning, UserWarning) |
| assert "distributed.worker.daemon" in str(warning) |
| finally: |
| client.close(timeout=30) |
| cluster.close(timeout=30) |
|
|