Spaces:
Running
Running
| import faulthandler | |
| import gc | |
| import itertools | |
| import mmap | |
| import os | |
| import pickle | |
| import platform | |
| import subprocess | |
| import sys | |
| import threading | |
| from time import sleep | |
| import pytest | |
| import joblib._memmapping_reducer as jmr | |
| from joblib._memmapping_reducer import ( | |
| ArrayMemmapForwardReducer, | |
| _get_backing_memmap, | |
| _get_temp_dir, | |
| _strided_from_memmap, | |
| _WeakArrayKeyMap, | |
| has_shareable_memory, | |
| ) | |
| from joblib.backports import make_memmap | |
| from joblib.executor import _TestingMemmappingExecutor as TestExecutor | |
| from joblib.parallel import Parallel, delayed | |
| from joblib.pool import MemmappingPool | |
| from joblib.test.common import ( | |
| IS_GIL_DISABLED, | |
| np, | |
| with_dev_shm, | |
| with_multiprocessing, | |
| with_numpy, | |
| ) | |
| from joblib.testing import parametrize, raises, skipif | |
| def setup_module(): | |
| faulthandler.dump_traceback_later(timeout=300, exit=True) | |
| def teardown_module(): | |
| faulthandler.cancel_dump_traceback_later() | |
| def check_memmap_and_send_back(array): | |
| assert _get_backing_memmap(array) is not None | |
| return array | |
| def check_array(args): | |
| """Dummy helper function to be executed in subprocesses | |
| Check that the provided array has the expected values in the provided | |
| range. | |
| """ | |
| data, position, expected = args | |
| np.testing.assert_array_equal(data[position], expected) | |
| def inplace_double(args): | |
| """Dummy helper function to be executed in subprocesses | |
| Check that the input array has the right values in the provided range | |
| and perform an inplace modification to double the values in the range by | |
| two. | |
| """ | |
| data, position, expected = args | |
| assert data[position] == expected | |
| data[position] *= 2 | |
| np.testing.assert_array_equal(data[position], 2 * expected) | |
| def test_memmap_based_array_reducing(tmpdir): | |
| """Check that it is possible to reduce a memmap backed array""" | |
| assert_array_equal = np.testing.assert_array_equal | |
| filename = tmpdir.join("test.mmap").strpath | |
| # Create a file larger than what will be used by a | |
| buffer = np.memmap(filename, dtype=np.float64, shape=500, mode="w+") | |
| # Fill the original buffer with negative markers to detect over of | |
| # underflow in case of test failures | |
| buffer[:] = -1.0 * np.arange(buffer.shape[0], dtype=buffer.dtype) | |
| buffer.flush() | |
| # Memmap a 2D fortran array on a offsetted subsection of the previous | |
| # buffer | |
| a = np.memmap( | |
| filename, dtype=np.float64, shape=(3, 5, 4), mode="r+", order="F", offset=4 | |
| ) | |
| a[:] = np.arange(60).reshape(a.shape) | |
| # Build various views that share the buffer with the original memmap | |
| # b is an memmap sliced view on an memmap instance | |
| b = a[1:-1, 2:-1, 2:4] | |
| # b2 is a memmap 2d with memmap 1d as base | |
| # non-regression test for https://github.com/joblib/joblib/issues/1703 | |
| b2 = buffer.reshape(10, 50) | |
| # c and d are array views | |
| c = np.asarray(b) | |
| d = c.T | |
| # Array reducer with auto dumping disabled | |
| reducer = ArrayMemmapForwardReducer(None, tmpdir.strpath, "c", True) | |
| def reconstruct_array_or_memmap(x): | |
| cons, args = reducer(x) | |
| return cons(*args) | |
| # Reconstruct original memmap | |
| a_reconstructed = reconstruct_array_or_memmap(a) | |
| assert has_shareable_memory(a_reconstructed) | |
| assert isinstance(a_reconstructed, np.memmap) | |
| assert_array_equal(a_reconstructed, a) | |
| # Reconstruct strided memmap view | |
| b_reconstructed = reconstruct_array_or_memmap(b) | |
| assert has_shareable_memory(b_reconstructed) | |
| assert_array_equal(b_reconstructed, b) | |
| # Reconstruct memmap 2d with memmap 1d as base | |
| b2_reconstructed = reconstruct_array_or_memmap(b2) | |
| assert has_shareable_memory(b2_reconstructed) | |
| assert_array_equal(b2_reconstructed, b2) | |
| # Reconstruct arrays views on memmap base | |
| c_reconstructed = reconstruct_array_or_memmap(c) | |
| assert not isinstance(c_reconstructed, np.memmap) | |
| assert has_shareable_memory(c_reconstructed) | |
| assert_array_equal(c_reconstructed, c) | |
| d_reconstructed = reconstruct_array_or_memmap(d) | |
| assert not isinstance(d_reconstructed, np.memmap) | |
| assert has_shareable_memory(d_reconstructed) | |
| assert_array_equal(d_reconstructed, d) | |
| # Test graceful degradation on fake memmap instances with in-memory | |
| # buffers | |
| a3 = a * 3 | |
| assert not has_shareable_memory(a3) | |
| a3_reconstructed = reconstruct_array_or_memmap(a3) | |
| assert not has_shareable_memory(a3_reconstructed) | |
| assert not isinstance(a3_reconstructed, np.memmap) | |
| assert_array_equal(a3_reconstructed, a * 3) | |
| # Test graceful degradation on arrays derived from fake memmap instances | |
| b3 = np.asarray(a3) | |
| assert not has_shareable_memory(b3) | |
| b3_reconstructed = reconstruct_array_or_memmap(b3) | |
| assert isinstance(b3_reconstructed, np.ndarray) | |
| assert not has_shareable_memory(b3_reconstructed) | |
| assert_array_equal(b3_reconstructed, b3) | |
| def test_resource_tracker_retries_when_permissionerror(tmpdir): | |
| # Test resource_tracker retry mechanism when unlinking memmaps. See more | |
| # thorough information in the ``unlink_file`` documentation of joblib. | |
| filename = tmpdir.join("test.mmap").strpath | |
| cmd = """if 1: | |
| import os | |
| import numpy as np | |
| import time | |
| from joblib.externals.loky.backend import resource_tracker | |
| resource_tracker.VERBOSE = 1 | |
| # Start the resource tracker | |
| resource_tracker.ensure_running() | |
| time.sleep(1) | |
| # Create a file containing numpy data | |
| memmap = np.memmap(r"{filename}", dtype=np.float64, shape=10, mode='w+') | |
| memmap[:] = np.arange(10).astype(np.int8).data | |
| memmap.flush() | |
| assert os.path.exists(r"{filename}") | |
| del memmap | |
| # Create a np.memmap backed by this file | |
| memmap = np.memmap(r"{filename}", dtype=np.float64, shape=10, mode='w+') | |
| resource_tracker.register(r"{filename}", "file") | |
| # Ask the resource_tracker to delete the file backing the np.memmap , this | |
| # should raise PermissionError that the resource_tracker will log. | |
| resource_tracker.maybe_unlink(r"{filename}", "file") | |
| # Wait for the resource_tracker to process the maybe_unlink before cleaning | |
| # up the memmap | |
| time.sleep(2) | |
| """.format(filename=filename) | |
| p = subprocess.Popen( | |
| [sys.executable, "-c", cmd], stderr=subprocess.PIPE, stdout=subprocess.PIPE | |
| ) | |
| p.wait() | |
| out, err = p.communicate() | |
| assert p.returncode == 0, err.decode() | |
| assert out == b"" | |
| msg = "tried to unlink {}, got PermissionError".format(filename) | |
| assert msg in err.decode() | |
| def test_high_dimension_memmap_array_reducing(tmpdir): | |
| assert_array_equal = np.testing.assert_array_equal | |
| filename = tmpdir.join("test.mmap").strpath | |
| # Create a high dimensional memmap | |
| a = np.memmap(filename, dtype=np.float64, shape=(100, 15, 15, 3), mode="w+") | |
| a[:] = np.arange(100 * 15 * 15 * 3).reshape(a.shape) | |
| # Create some slices/indices at various dimensions | |
| b = a[0:10] | |
| c = a[:, 5:10] | |
| d = a[:, :, :, 0] | |
| e = a[1:3:4] | |
| # Array reducer with auto dumping disabled | |
| reducer = ArrayMemmapForwardReducer(None, tmpdir.strpath, "c", True) | |
| def reconstruct_array_or_memmap(x): | |
| cons, args = reducer(x) | |
| return cons(*args) | |
| a_reconstructed = reconstruct_array_or_memmap(a) | |
| assert has_shareable_memory(a_reconstructed) | |
| assert isinstance(a_reconstructed, np.memmap) | |
| assert_array_equal(a_reconstructed, a) | |
| b_reconstructed = reconstruct_array_or_memmap(b) | |
| assert has_shareable_memory(b_reconstructed) | |
| assert_array_equal(b_reconstructed, b) | |
| c_reconstructed = reconstruct_array_or_memmap(c) | |
| assert has_shareable_memory(c_reconstructed) | |
| assert_array_equal(c_reconstructed, c) | |
| d_reconstructed = reconstruct_array_or_memmap(d) | |
| assert has_shareable_memory(d_reconstructed) | |
| assert_array_equal(d_reconstructed, d) | |
| e_reconstructed = reconstruct_array_or_memmap(e) | |
| assert has_shareable_memory(e_reconstructed) | |
| assert_array_equal(e_reconstructed, e) | |
| def test__strided_from_memmap(tmpdir): | |
| fname = tmpdir.join("test.mmap").strpath | |
| size = 5 * mmap.ALLOCATIONGRANULARITY | |
| offset = mmap.ALLOCATIONGRANULARITY + 1 | |
| # This line creates the mmap file that is reused later | |
| memmap_obj = np.memmap(fname, mode="w+", shape=size + offset) | |
| # filename, dtype, mode, offset, order, shape, strides, total_buffer_len | |
| memmap_obj = _strided_from_memmap( | |
| fname, | |
| dtype="uint8", | |
| mode="r", | |
| offset=offset, | |
| order="C", | |
| shape=size, | |
| strides=None, | |
| total_buffer_len=None, | |
| unlink_on_gc_collect=False, | |
| ) | |
| assert isinstance(memmap_obj, np.memmap) | |
| assert memmap_obj.offset == offset | |
| memmap_backed_obj = _strided_from_memmap( | |
| fname, | |
| dtype="uint8", | |
| mode="r", | |
| offset=offset, | |
| order="C", | |
| shape=(size // 2,), | |
| strides=(2,), | |
| total_buffer_len=size, | |
| unlink_on_gc_collect=False, | |
| ) | |
| assert _get_backing_memmap(memmap_backed_obj).offset == offset | |
| def test_pool_with_memmap(factory, tmpdir): | |
| """Check that subprocess can access and update shared memory memmap""" | |
| assert_array_equal = np.testing.assert_array_equal | |
| # Fork the subprocess before allocating the objects to be passed | |
| pool_temp_folder = tmpdir.mkdir("pool").strpath | |
| p = factory(10, max_nbytes=2, temp_folder=pool_temp_folder) | |
| try: | |
| filename = tmpdir.join("test.mmap").strpath | |
| a = np.memmap(filename, dtype=np.float32, shape=(3, 5), mode="w+") | |
| a.fill(1.0) | |
| p.map( | |
| inplace_double, | |
| [(a, (i, j), 1.0) for i in range(a.shape[0]) for j in range(a.shape[1])], | |
| ) | |
| assert_array_equal(a, 2 * np.ones(a.shape)) | |
| # Open a copy-on-write view on the previous data | |
| b = np.memmap(filename, dtype=np.float32, shape=(5, 3), mode="c") | |
| p.map( | |
| inplace_double, | |
| [(b, (i, j), 2.0) for i in range(b.shape[0]) for j in range(b.shape[1])], | |
| ) | |
| # Passing memmap instances to the pool should not trigger the creation | |
| # of new files on the FS | |
| assert os.listdir(pool_temp_folder) == [] | |
| # the original data is untouched | |
| assert_array_equal(a, 2 * np.ones(a.shape)) | |
| assert_array_equal(b, 2 * np.ones(b.shape)) | |
| # readonly maps can be read but not updated | |
| c = np.memmap(filename, dtype=np.float32, shape=(10,), mode="r", offset=5 * 4) | |
| with raises(AssertionError): | |
| p.map(check_array, [(c, i, 3.0) for i in range(c.shape[0])]) | |
| # depending on the version of numpy one can either get a RuntimeError | |
| # or a ValueError | |
| with raises((RuntimeError, ValueError)): | |
| p.map(inplace_double, [(c, i, 2.0) for i in range(c.shape[0])]) | |
| finally: | |
| # Clean all filehandlers held by the pool | |
| p.terminate() | |
| del p | |
| def test_pool_with_memmap_array_view(factory, tmpdir): | |
| """Check that subprocess can access and update shared memory array""" | |
| assert_array_equal = np.testing.assert_array_equal | |
| # Fork the subprocess before allocating the objects to be passed | |
| pool_temp_folder = tmpdir.mkdir("pool").strpath | |
| p = factory(10, max_nbytes=2, temp_folder=pool_temp_folder) | |
| try: | |
| filename = tmpdir.join("test.mmap").strpath | |
| a = np.memmap(filename, dtype=np.float32, shape=(3, 5), mode="w+") | |
| a.fill(1.0) | |
| # Create an ndarray view on the memmap instance | |
| a_view = np.asarray(a) | |
| assert not isinstance(a_view, np.memmap) | |
| assert has_shareable_memory(a_view) | |
| p.map( | |
| inplace_double, | |
| [ | |
| (a_view, (i, j), 1.0) | |
| for i in range(a.shape[0]) | |
| for j in range(a.shape[1]) | |
| ], | |
| ) | |
| # Both a and the a_view have been updated | |
| assert_array_equal(a, 2 * np.ones(a.shape)) | |
| assert_array_equal(a_view, 2 * np.ones(a.shape)) | |
| # Passing memmap array view to the pool should not trigger the | |
| # creation of new files on the FS | |
| assert os.listdir(pool_temp_folder) == [] | |
| finally: | |
| p.terminate() | |
| del p | |
| def test_permission_error_windows_reference_cycle(backend): | |
| # Non regression test for: | |
| # https://github.com/joblib/joblib/issues/806 | |
| # | |
| # The issue happens when trying to delete a memory mapped file that has | |
| # not yet been closed by one of the worker processes. | |
| cmd = """if 1: | |
| import numpy as np | |
| from joblib import Parallel, delayed | |
| data = np.random.rand(int(2e6)).reshape((int(1e6), 2)) | |
| # Build a complex cyclic reference that is likely to delay garbage | |
| # collection of the memmapped array in the worker processes. | |
| first_list = current_list = [data] | |
| for i in range(10): | |
| current_list = [current_list] | |
| first_list.append(current_list) | |
| if __name__ == "__main__": | |
| results = Parallel(n_jobs=2, backend="{b}")( | |
| delayed(len)(current_list) for i in range(10)) | |
| assert results == [1] * 10 | |
| """.format(b=backend) | |
| p = subprocess.Popen( | |
| [sys.executable, "-c", cmd], stderr=subprocess.PIPE, stdout=subprocess.PIPE | |
| ) | |
| p.wait() | |
| out, err = p.communicate() | |
| assert p.returncode == 0, out.decode() + "\n\n" + err.decode() | |
| def test_permission_error_windows_memmap_sent_to_parent(backend): | |
| # Second non-regression test for: | |
| # https://github.com/joblib/joblib/issues/806 | |
| # previously, child process would not convert temporary memmaps to numpy | |
| # arrays when sending the data back to the parent process. This would lead | |
| # to permission errors on windows when deleting joblib's temporary folder, | |
| # as the memmaped files handles would still opened in the parent process. | |
| cmd = """if 1: | |
| import os | |
| import time | |
| import numpy as np | |
| from joblib import Parallel, delayed | |
| from testutils import return_slice_of_data | |
| data = np.ones(int(2e6)) | |
| if __name__ == '__main__': | |
| # warm-up call to launch the workers and start the resource_tracker | |
| _ = Parallel(n_jobs=2, verbose=5, backend='{b}')( | |
| delayed(id)(i) for i in range(20)) | |
| time.sleep(0.5) | |
| slice_of_data = Parallel(n_jobs=2, verbose=5, backend='{b}')( | |
| delayed(return_slice_of_data)(data, 0, 20) for _ in range(10)) | |
| """.format(b=backend) | |
| for _ in range(3): | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = os.path.dirname(__file__) | |
| p = subprocess.Popen( | |
| [sys.executable, "-c", cmd], | |
| stderr=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| env=env, | |
| ) | |
| p.wait() | |
| out, err = p.communicate() | |
| assert p.returncode == 0, err | |
| assert out == b"" | |
| assert b"resource_tracker" not in err | |
| def test_parallel_isolated_temp_folders(backend): | |
| # Test that consecutive Parallel call use isolated subfolders, even | |
| # for the loky backend that reuses its executor instance across calls. | |
| array = np.arange(int(1e2)) | |
| [filename_1] = Parallel(n_jobs=2, backend=backend, max_nbytes=10)( | |
| delayed(getattr)(array, "filename") for _ in range(1) | |
| ) | |
| [filename_2] = Parallel(n_jobs=2, backend=backend, max_nbytes=10)( | |
| delayed(getattr)(array, "filename") for _ in range(1) | |
| ) | |
| assert os.path.dirname(filename_2) != os.path.dirname(filename_1) | |
| def test_managed_backend_reuse_temp_folder(backend): | |
| # Test that calls to a managed parallel object reuse the same memmaps. | |
| array = np.arange(int(1e2)) | |
| with Parallel(n_jobs=2, backend=backend, max_nbytes=10) as p: | |
| [filename_1] = p(delayed(getattr)(array, "filename") for _ in range(1)) | |
| [filename_2] = p(delayed(getattr)(array, "filename") for _ in range(1)) | |
| assert os.path.dirname(filename_2) == os.path.dirname(filename_1) | |
| def test_memmapping_temp_folder_thread_safety(): | |
| # Concurrent calls to Parallel with the loky backend will use the same | |
| # executor, and thus the same reducers. Make sure that those reducers use | |
| # different temporary folders depending on which Parallel objects called | |
| # them, which is necessary to limit potential race conditions during the | |
| # garbage collection of temporary memmaps. | |
| array = np.arange(int(1e2)) | |
| temp_dirs_thread_1 = set() | |
| temp_dirs_thread_2 = set() | |
| def concurrent_get_filename(array, temp_dirs): | |
| with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p: | |
| for i in range(10): | |
| [filename] = p(delayed(getattr)(array, "filename") for _ in range(1)) | |
| temp_dirs.add(os.path.dirname(filename)) | |
| t1 = threading.Thread( | |
| target=concurrent_get_filename, args=(array, temp_dirs_thread_1) | |
| ) | |
| t2 = threading.Thread( | |
| target=concurrent_get_filename, args=(array, temp_dirs_thread_2) | |
| ) | |
| t1.start() | |
| t2.start() | |
| t1.join() | |
| t2.join() | |
| assert len(temp_dirs_thread_1) == 1 | |
| assert len(temp_dirs_thread_2) == 1 | |
| assert temp_dirs_thread_1 != temp_dirs_thread_2 | |
| def test_multithreaded_parallel_termination_resource_tracker_silent(): | |
| # test that concurrent termination attempts of a same executor does not | |
| # emit any spurious error from the resource_tracker. We test various | |
| # situations making 0, 1 or both parallel call sending a task that will | |
| # make the worker (and thus the whole Parallel call) error out. | |
| cmd = """if 1: | |
| import os | |
| import numpy as np | |
| from joblib import Parallel, delayed | |
| from joblib.externals.loky.backend import resource_tracker | |
| from concurrent.futures import ThreadPoolExecutor, wait | |
| resource_tracker.VERBOSE = 0 | |
| array = np.arange(int(1e2)) | |
| temp_dirs_thread_1 = set() | |
| temp_dirs_thread_2 = set() | |
| def raise_error(array): | |
| raise ValueError | |
| def parallel_get_filename(array, temp_dirs): | |
| with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p: | |
| for i in range(10): | |
| [filename] = p( | |
| delayed(getattr)(array, "filename") for _ in range(1) | |
| ) | |
| temp_dirs.add(os.path.dirname(filename)) | |
| def parallel_raise(array, temp_dirs): | |
| with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p: | |
| for i in range(10): | |
| [filename] = p( | |
| delayed(raise_error)(array) for _ in range(1) | |
| ) | |
| temp_dirs.add(os.path.dirname(filename)) | |
| executor = ThreadPoolExecutor(max_workers=2) | |
| # both function calls will use the same loky executor, but with a | |
| # different Parallel object. | |
| future_1 = executor.submit({f1}, array, temp_dirs_thread_1) | |
| future_2 = executor.submit({f2}, array, temp_dirs_thread_2) | |
| # Wait for both threads to terminate their backend | |
| wait([future_1, future_2]) | |
| future_1.result() | |
| future_2.result() | |
| """ | |
| functions_and_returncodes = [ | |
| ("parallel_get_filename", "parallel_get_filename", 0), | |
| ("parallel_get_filename", "parallel_raise", 1), | |
| ("parallel_raise", "parallel_raise", 1), | |
| ] | |
| for f1, f2, returncode in functions_and_returncodes: | |
| p = subprocess.Popen( | |
| [sys.executable, "-c", cmd.format(f1=f1, f2=f2)], | |
| stderr=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| ) | |
| p.wait() | |
| _, err = p.communicate() | |
| assert p.returncode == returncode, err.decode() | |
| assert b"resource_tracker" not in err, err.decode() | |
| def test_many_parallel_calls_on_same_object(backend): | |
| # After #966 got merged, consecutive Parallel objects were sharing temp | |
| # folder, which would lead to race conditions happening during the | |
| # temporary resources management with the resource_tracker. This is a | |
| # non-regression test that makes sure that consecutive Parallel operations | |
| # on the same object do not error out. | |
| cmd = """if 1: | |
| import os | |
| import time | |
| import numpy as np | |
| from joblib import Parallel, delayed | |
| from testutils import return_slice_of_data | |
| data = np.ones(100) | |
| if __name__ == '__main__': | |
| for i in range(5): | |
| slice_of_data = Parallel( | |
| n_jobs=2, max_nbytes=1, backend='{b}')( | |
| delayed(return_slice_of_data)(data, 0, 20) | |
| for _ in range(10) | |
| ) | |
| """.format(b=backend) | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = os.path.dirname(__file__) | |
| p = subprocess.Popen( | |
| [sys.executable, "-c", cmd], | |
| stderr=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| env=env, | |
| ) | |
| p.wait() | |
| out, err = p.communicate() | |
| assert p.returncode == 0, err.decode() | |
| assert out == b"", out.decode() | |
| assert b"resource_tracker" not in err | |
| def test_memmap_returned_as_regular_array(backend): | |
| data = np.ones(int(1e3)) | |
| # Check that child processes send temporary memmaps back as numpy arrays. | |
| [result] = Parallel(n_jobs=2, backend=backend, max_nbytes=100)( | |
| delayed(check_memmap_and_send_back)(data) for _ in range(1) | |
| ) | |
| assert _get_backing_memmap(result) is None | |
| def test_resource_tracker_silent_when_reference_cycles(backend): | |
| # There is a variety of reasons that can make joblib with loky backend | |
| # output noisy warnings when a reference cycle is preventing a memmap from | |
| # being garbage collected. Especially, joblib's main process finalizer | |
| # deletes the temporary folder if it was not done before, which can | |
| # interact badly with the resource_tracker. We don't risk leaking any | |
| # resources, but this will likely make joblib output a lot of low-level | |
| # confusing messages. | |
| # | |
| # This test makes sure that the resource_tracker is silent when a reference | |
| # has been collected concurrently on non-Windows platforms. | |
| # | |
| # Note that the script in ``cmd`` is the exact same script as in | |
| # test_permission_error_windows_reference_cycle. | |
| if backend == "loky" and sys.platform.startswith("win"): | |
| # XXX: on Windows, reference cycles can delay timely garbage collection | |
| # and make it impossible to properly delete the temporary folder in the | |
| # main process because of permission errors. | |
| pytest.xfail( | |
| "The temporary folder cannot be deleted on Windows in the " | |
| "presence of a reference cycle" | |
| ) | |
| cmd = """if 1: | |
| import numpy as np | |
| from joblib import Parallel, delayed | |
| data = np.random.rand(int(2e6)).reshape((int(1e6), 2)) | |
| # Build a complex cyclic reference that is likely to delay garbage | |
| # collection of the memmapped array in the worker processes. | |
| first_list = current_list = [data] | |
| for i in range(10): | |
| current_list = [current_list] | |
| first_list.append(current_list) | |
| if __name__ == "__main__": | |
| results = Parallel(n_jobs=2, backend="{b}")( | |
| delayed(len)(current_list) for i in range(10)) | |
| assert results == [1] * 10 | |
| """.format(b=backend) | |
| p = subprocess.Popen( | |
| [sys.executable, "-c", cmd], stderr=subprocess.PIPE, stdout=subprocess.PIPE | |
| ) | |
| p.wait() | |
| out, err = p.communicate() | |
| out = out.decode() | |
| err = err.decode() | |
| assert p.returncode == 0, out + "\n\n" + err | |
| assert "resource_tracker" not in err, err | |
| def test_memmapping_pool_for_large_arrays(factory, tmpdir): | |
| """Check that large arrays are not copied in memory""" | |
| # Check that the tempfolder is empty | |
| assert os.listdir(tmpdir.strpath) == [] | |
| # Build an array reducers that automatically dump large array content | |
| # to filesystem backed memmap instances to avoid memory explosion | |
| p = factory(3, max_nbytes=40, temp_folder=tmpdir.strpath, verbose=2) | |
| try: | |
| # The temporary folder for the pool is not provisioned in advance | |
| assert os.listdir(tmpdir.strpath) == [] | |
| assert not os.path.exists(p._temp_folder) | |
| small = np.ones(5, dtype=np.float32) | |
| assert small.nbytes == 20 | |
| p.map(check_array, [(small, i, 1.0) for i in range(small.shape[0])]) | |
| # Memory has been copied, the pool filesystem folder is unused | |
| assert os.listdir(tmpdir.strpath) == [] | |
| # Try with a file larger than the memmap threshold of 40 bytes | |
| large = np.ones(100, dtype=np.float64) | |
| assert large.nbytes == 800 | |
| p.map(check_array, [(large, i, 1.0) for i in range(large.shape[0])]) | |
| # The data has been dumped in a temp folder for subprocess to share it | |
| # without per-child memory copies | |
| assert os.path.isdir(p._temp_folder) | |
| dumped_filenames = os.listdir(p._temp_folder) | |
| assert len(dumped_filenames) == 1 | |
| # Check that memory mapping is not triggered for arrays with | |
| # dtype='object' | |
| objects = np.array(["abc"] * 100, dtype="object") | |
| results = p.map(has_shareable_memory, [objects]) | |
| assert not results[0] | |
| finally: | |
| # check FS garbage upon pool termination | |
| p.terminate() | |
| for i in range(10): | |
| sleep(0.1) | |
| if not os.path.exists(p._temp_folder): | |
| break | |
| else: # pragma: no cover | |
| raise AssertionError( | |
| "temporary folder {} was not deleted".format(p._temp_folder) | |
| ) | |
| del p | |
| def test_child_raises_parent_exits_cleanly(backend): | |
| # When a task executed by a child process raises an error, the parent | |
| # process's backend is notified, and calls abort_everything. | |
| # In loky, abort_everything itself calls shutdown(kill_workers=True) which | |
| # sends SIGKILL to the worker, preventing it from running the finalizers | |
| # supposed to signal the resource_tracker when the worker is done using | |
| # objects relying on a shared resource (e.g np.memmaps). Because this | |
| # behavior is prone to : | |
| # - cause a resource leak | |
| # - make the resource tracker emit noisy resource warnings | |
| # we explicitly test that, when the said situation occurs: | |
| # - no resources are actually leaked | |
| # - the temporary resources are deleted as soon as possible (typically, at | |
| # the end of the failing Parallel call) | |
| # - the resource_tracker does not emit any warnings. | |
| cmd = """if 1: | |
| import os | |
| from pathlib import Path | |
| from time import sleep | |
| import numpy as np | |
| from joblib import Parallel, delayed | |
| from testutils import print_filename_and_raise | |
| data = np.random.rand(1000) | |
| def get_temp_folder(parallel_obj, backend): | |
| if "{b}" == "loky": | |
| return Path(parallel_obj._backend._workers._temp_folder) | |
| else: | |
| return Path(parallel_obj._backend._pool._temp_folder) | |
| if __name__ == "__main__": | |
| try: | |
| with Parallel(n_jobs=2, backend="{b}", max_nbytes=100) as p: | |
| temp_folder = get_temp_folder(p, "{b}") | |
| p(delayed(print_filename_and_raise)(data) | |
| for i in range(1)) | |
| except ValueError as e: | |
| # the temporary folder should be deleted by the end of this | |
| # call but apparently on some file systems, this takes | |
| # some time to be visible. | |
| # | |
| # We attempt to write into the temporary folder to test for | |
| # its existence and we wait for a maximum of 10 seconds. | |
| for i in range(100): | |
| try: | |
| with open(temp_folder / "some_file.txt", "w") as f: | |
| f.write("some content") | |
| except FileNotFoundError: | |
| # temp_folder has been deleted, all is fine | |
| break | |
| # ... else, wait a bit and try again | |
| sleep(.1) | |
| else: | |
| raise AssertionError( | |
| str(temp_folder) + " was not deleted" | |
| ) from e | |
| """.format(b=backend) | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = os.path.dirname(__file__) | |
| p = subprocess.Popen( | |
| [sys.executable, "-c", cmd], | |
| stderr=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| env=env, | |
| ) | |
| p.wait() | |
| out, err = p.communicate() | |
| out, err = out.decode(), err.decode() | |
| filename = out.split("\n")[0] | |
| assert p.returncode == 0, err or out | |
| assert err == "" # no resource_tracker warnings. | |
| assert not os.path.exists(filename) | |
| def test_memmapping_pool_for_large_arrays_disabled(factory, tmpdir): | |
| """Check that large arrays memmapping can be disabled""" | |
| # Set max_nbytes to None to disable the auto memmapping feature | |
| p = factory(3, max_nbytes=None, temp_folder=tmpdir.strpath) | |
| try: | |
| # Check that the tempfolder is empty | |
| assert os.listdir(tmpdir.strpath) == [] | |
| # Try with a file largish than the memmap threshold of 40 bytes | |
| large = np.ones(100, dtype=np.float64) | |
| assert large.nbytes == 800 | |
| p.map(check_array, [(large, i, 1.0) for i in range(large.shape[0])]) | |
| # Check that the tempfolder is still empty | |
| assert os.listdir(tmpdir.strpath) == [] | |
| finally: | |
| # Cleanup open file descriptors | |
| p.terminate() | |
| del p | |
| def test_memmapping_on_large_enough_dev_shm(factory): | |
| """Check that memmapping uses /dev/shm when possible""" | |
| orig_size = jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE | |
| try: | |
| # Make joblib believe that it can use /dev/shm even when running on a | |
| # CI container where the size of the /dev/shm is not very large (that | |
| # is at least 32 MB instead of 2 GB by default). | |
| jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(32e6) | |
| p = factory(3, max_nbytes=10) | |
| try: | |
| # Check that the pool has correctly detected the presence of the | |
| # shared memory filesystem. | |
| pool_temp_folder = p._temp_folder | |
| folder_prefix = "/dev/shm/joblib_memmapping_folder_" | |
| assert pool_temp_folder.startswith(folder_prefix) | |
| assert os.path.exists(pool_temp_folder) | |
| # Try with a file larger than the memmap threshold of 10 bytes | |
| a = np.ones(100, dtype=np.float64) | |
| assert a.nbytes == 800 | |
| p.map(id, [a] * 10) | |
| # a should have been memmapped to the pool temp folder: the joblib | |
| # pickling procedure generate one .pkl file: | |
| assert len(os.listdir(pool_temp_folder)) == 1 | |
| # create a new array with content that is different from 'a' so | |
| # that it is mapped to a different file in the temporary folder of | |
| # the pool. | |
| b = np.ones(100, dtype=np.float64) * 2 | |
| assert b.nbytes == 800 | |
| p.map(id, [b] * 10) | |
| # A copy of both a and b are now stored in the shared memory folder | |
| assert len(os.listdir(pool_temp_folder)) == 2 | |
| finally: | |
| # Cleanup open file descriptors | |
| p.terminate() | |
| del p | |
| for i in range(100): | |
| # The temp folder is cleaned up upon pool termination | |
| if not os.path.exists(pool_temp_folder): | |
| break | |
| sleep(0.1) | |
| else: # pragma: no cover | |
| raise AssertionError("temporary folder of pool was not deleted") | |
| finally: | |
| jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = orig_size | |
| def test_memmapping_on_too_small_dev_shm(factory): | |
| orig_size = jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE | |
| try: | |
| # Make joblib believe that it cannot use /dev/shm unless there is | |
| # 42 exabytes of available shared memory in /dev/shm | |
| jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(42e18) | |
| p = factory(3, max_nbytes=10) | |
| try: | |
| # Check that the pool has correctly detected the presence of the | |
| # shared memory filesystem. | |
| pool_temp_folder = p._temp_folder | |
| assert not pool_temp_folder.startswith("/dev/shm") | |
| finally: | |
| # Cleanup open file descriptors | |
| p.terminate() | |
| del p | |
| # The temp folder is cleaned up upon pool termination | |
| assert not os.path.exists(pool_temp_folder) | |
| finally: | |
| jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = orig_size | |
| def test_memmapping_pool_for_large_arrays_in_return(factory, tmpdir): | |
| """Check that large arrays are not copied in memory in return""" | |
| assert_array_equal = np.testing.assert_array_equal | |
| # Build an array reducers that automatically dump large array content | |
| # but check that the returned datastructure are regular arrays to avoid | |
| # passing a memmap array pointing to a pool controlled temp folder that | |
| # might be confusing to the user | |
| # The MemmappingPool user can always return numpy.memmap object explicitly | |
| # to avoid memory copy | |
| p = factory(3, max_nbytes=10, temp_folder=tmpdir.strpath) | |
| try: | |
| res = p.apply_async(np.ones, args=(1000,)) | |
| large = res.get() | |
| assert not has_shareable_memory(large) | |
| assert_array_equal(large, np.ones(1000)) | |
| finally: | |
| p.terminate() | |
| del p | |
| def _worker_multiply(a, n_times): | |
| """Multiplication function to be executed by subprocess""" | |
| assert has_shareable_memory(a) | |
| return a * n_times | |
| def test_workaround_against_bad_memmap_with_copied_buffers(factory, tmpdir): | |
| """Check that memmaps with a bad buffer are returned as regular arrays | |
| Unary operations and ufuncs on memmap instances return a new memmap | |
| instance with an in-memory buffer (probably a numpy bug). | |
| """ | |
| assert_array_equal = np.testing.assert_array_equal | |
| p = factory(3, max_nbytes=10, temp_folder=tmpdir.strpath) | |
| try: | |
| # Send a complex, large-ish view on a array that will be converted to | |
| # a memmap in the worker process | |
| a = np.asarray(np.arange(6000).reshape((1000, 2, 3)), order="F")[:, :1, :] | |
| # Call a non-inplace multiply operation on the worker and memmap and | |
| # send it back to the parent. | |
| b = p.apply_async(_worker_multiply, args=(a, 3)).get() | |
| assert not has_shareable_memory(b) | |
| assert_array_equal(b, 3 * a) | |
| finally: | |
| p.terminate() | |
| del p | |
| def identity(arg): | |
| return arg | |
| def test_pool_memmap_with_big_offset(factory, retry_no, tmpdir): | |
| # Test that numpy memmap offset is set correctly if greater than | |
| # mmap.ALLOCATIONGRANULARITY, see | |
| # https://github.com/joblib/joblib/issues/451 and | |
| # https://github.com/numpy/numpy/pull/8443 for more details. | |
| fname = tmpdir.join("test.mmap").strpath | |
| size = 5 * mmap.ALLOCATIONGRANULARITY | |
| offset = mmap.ALLOCATIONGRANULARITY + 1 | |
| obj = make_memmap(fname, mode="w+", shape=size, dtype="uint8", offset=offset) | |
| p = factory(2, temp_folder=tmpdir.strpath) | |
| result = p.apply_async(identity, args=(obj,)).get() | |
| assert isinstance(result, np.memmap) | |
| assert result.offset == offset | |
| np.testing.assert_array_equal(obj, result) | |
| p.terminate() | |
| def test_pool_get_temp_dir(tmpdir): | |
| pool_folder_name = "test.tmpdir" | |
| pool_folder, shared_mem = _get_temp_dir(pool_folder_name, tmpdir.strpath) | |
| assert shared_mem is False | |
| assert pool_folder == tmpdir.join("test.tmpdir").strpath | |
| pool_folder, shared_mem = _get_temp_dir(pool_folder_name, temp_folder=None) | |
| if sys.platform.startswith("win"): | |
| assert shared_mem is False | |
| assert pool_folder.endswith(pool_folder_name) | |
| def test_pool_get_temp_dir_no_statvfs(tmpdir, monkeypatch): | |
| """Check that _get_temp_dir works when os.statvfs is not defined | |
| Regression test for #902 | |
| """ | |
| pool_folder_name = "test.tmpdir" | |
| import joblib._memmapping_reducer | |
| if hasattr(joblib._memmapping_reducer.os, "statvfs"): | |
| # We are on Unix, since Windows doesn't have this function | |
| monkeypatch.delattr(joblib._memmapping_reducer.os, "statvfs") | |
| pool_folder, shared_mem = _get_temp_dir(pool_folder_name, temp_folder=None) | |
| if sys.platform.startswith("win"): | |
| assert shared_mem is False | |
| assert pool_folder.endswith(pool_folder_name) | |
| def test_numpy_arrays_use_different_memory(mmap_mode): | |
| def func(arr, value): | |
| arr[:] = value | |
| return arr | |
| arrays = [np.zeros((10, 10), dtype="float64") for i in range(10)] | |
| results = Parallel(mmap_mode=mmap_mode, max_nbytes=0, n_jobs=2)( | |
| delayed(func)(arr, i) for i, arr in enumerate(arrays) | |
| ) | |
| for i, arr in enumerate(results): | |
| np.testing.assert_array_equal(arr, i) | |
| def test_weak_array_key_map(): | |
| def assert_empty_after_gc_collect(container, retries=100): | |
| for i in range(retries): | |
| if len(container) == 0: | |
| return | |
| gc.collect() | |
| sleep(0.1) | |
| assert len(container) == 0 | |
| a = np.ones(42) | |
| m = _WeakArrayKeyMap() | |
| m.set(a, "a") | |
| assert m.get(a) == "a" | |
| b = a | |
| assert m.get(b) == "a" | |
| m.set(b, "b") | |
| assert m.get(a) == "b" | |
| del a | |
| gc.collect() | |
| assert len(m._data) == 1 | |
| assert m.get(b) == "b" | |
| del b | |
| assert_empty_after_gc_collect(m._data) | |
| c = np.ones(42) | |
| m.set(c, "c") | |
| assert len(m._data) == 1 | |
| assert m.get(c) == "c" | |
| with raises(KeyError): | |
| m.get(np.ones(42)) | |
| del c | |
| assert_empty_after_gc_collect(m._data) | |
| # Check that creating and dropping numpy arrays with potentially the same | |
| # object id will not cause the map to get confused. | |
| def get_set_get_collect(m, i): | |
| a = np.ones(42) | |
| with raises(KeyError): | |
| m.get(a) | |
| m.set(a, i) | |
| assert m.get(a) == i | |
| return id(a) | |
| unique_ids = set([get_set_get_collect(m, i) for i in range(1000)]) | |
| if platform.python_implementation() == "CPython": | |
| # On CPython (at least) the same id is often reused many times for the | |
| # temporary arrays created under the local scope of the | |
| # get_set_get_collect function without causing any spurious lookups / | |
| # insertions in the map. Apparently on free-threaded Python, the id is | |
| # not reused as often. | |
| max_len_unique_ids = 400 if IS_GIL_DISABLED else 100 | |
| assert len(unique_ids) < max_len_unique_ids | |
| def test_weak_array_key_map_no_pickling(): | |
| m = _WeakArrayKeyMap() | |
| with raises(pickle.PicklingError): | |
| pickle.dumps(m) | |
| def test_direct_mmap(tmpdir): | |
| testfile = str(tmpdir.join("arr.dat")) | |
| a = np.arange(10, dtype="uint8") | |
| a.tofile(testfile) | |
| def _read_array(): | |
| with open(testfile) as fd: | |
| mm = mmap.mmap(fd.fileno(), 0, access=mmap.ACCESS_READ, offset=0) | |
| return np.ndarray((10,), dtype=np.uint8, buffer=mm, offset=0) | |
| def func(x): | |
| return x**2 | |
| arr = _read_array() | |
| # this gives the reference result of the function with an array | |
| ref = Parallel(n_jobs=2)(delayed(func)(x) for x in [a]) | |
| # now test that it works with the mmap array | |
| results = Parallel(n_jobs=2)(delayed(func)(x) for x in [arr]) | |
| np.testing.assert_array_equal(results, ref) | |
| # also test that a mmap array read in the subprocess is correctly returned | |
| results = Parallel(n_jobs=2)(delayed(_read_array)() for _ in range(1)) | |
| np.testing.assert_array_equal(results[0], arr) | |
| def test_parallel_memmap2d_as_memmap_1d_base(tmpdir): | |
| # non-regression test for https://github.com/joblib/joblib/issues/1703, | |
| # where 2D arrays backed by 1D memmap had un-wanted order changes. | |
| testfile = str(tmpdir.join("arr2.dat")) | |
| a = np.arange(10, dtype="uint8").reshape(5, 2) | |
| a.tofile(testfile) | |
| def _read_array(): | |
| mm = np.memmap(testfile) | |
| return mm.reshape(5, 2) | |
| def func(x): | |
| return x**2 | |
| arr = _read_array() | |
| # this gives the reference result of the function with an array | |
| ref = Parallel(n_jobs=2)(delayed(func)(x) for x in [a]) | |
| # now test that it works with a view on a 1D mmap array | |
| results = Parallel(n_jobs=2)(delayed(func)(x) for x in [arr]) | |
| assert not results[0].flags["F_CONTIGUOUS"] | |
| np.testing.assert_array_equal(results, ref) | |
| # also test that returned memmap arrays are correctly ordered | |
| results = Parallel(n_jobs=2)(delayed(_read_array)() for _ in range(1)) | |
| np.testing.assert_array_equal(results[0], a) | |