From b285003a08d44ff85327367f37a61489a53217a3 Mon Sep 17 00:00:00 2001 From: Gael Varoquaux Date: Tue, 8 Feb 2022 09:20:22 +0100 Subject: [PATCH] No multiprocessing (#1256) Co-authored-by: Hood Chatham Co-authored-by: Olivier Grisel --- doc/parallel.rst | 7 ++ joblib/__init__.py | 3 +- joblib/_cloudpickle_wrapper.py | 17 +++++ joblib/_multiprocessing_helpers.py | 1 + joblib/parallel.py | 55 ++++++++++++--- joblib/test/test_cloudpickle_wrapper.py | 27 ++++++++ joblib/test/test_memmapping.py | 5 +- joblib/test/test_missing_multiprocessing.py | 32 +++++++++ joblib/test/test_module.py | 5 +- joblib/test/test_parallel.py | 75 +++++++++++++++------ 10 files changed, 194 insertions(+), 33 deletions(-) create mode 100644 joblib/_cloudpickle_wrapper.py create mode 100644 joblib/test/test_cloudpickle_wrapper.py create mode 100644 joblib/test/test_missing_multiprocessing.py diff --git a/doc/parallel.rst b/doc/parallel.rst index 466d613..8514e2d 100644 --- a/doc/parallel.rst +++ b/doc/parallel.rst @@ -71,6 +71,13 @@ call to :class:`joblib.Parallel` but this is now considered a bad pattern (when done in a library) as it does not make it possible to override that choice with the ``parallel_backend`` context manager. + +.. topic:: The loky backend may not always be available + + Some rare systems do not support multiprocessing (for instance + Pyodide). In this case the loky backend is not available and the + default backend falls back to threading. + Besides builtin joblib backends, we can use `Joblib Apache Spark Backend `_ to distribute joblib tasks on a Spark cluster. diff --git a/joblib/__init__.py b/joblib/__init__.py index 4255c86..9863998 100644 --- a/joblib/__init__.py +++ b/joblib/__init__.py @@ -123,8 +123,7 @@ from .parallel import cpu_count from .parallel import register_parallel_backend from .parallel import parallel_backend from .parallel import effective_n_jobs - -from .externals.loky import wrap_non_picklable_objects +from ._cloudpickle_wrapper import wrap_non_picklable_objects __all__ = ['Memory', 'MemorizedResult', 'PrintTime', 'Logger', 'hash', 'dump', diff --git a/joblib/_cloudpickle_wrapper.py b/joblib/_cloudpickle_wrapper.py new file mode 100644 index 0000000..3dbe3ae --- /dev/null +++ b/joblib/_cloudpickle_wrapper.py @@ -0,0 +1,17 @@ +""" +Small shim of loky's cloudpickle_wrapper to avoid failure when +multiprocessing is not available. +""" + + +from ._multiprocessing_helpers import mp + + +def my_wrap_non_picklable_objects(obj, keep_wrapper=True): + return obj + + +if mp is None: + wrap_non_picklable_objects = my_wrap_non_picklable_objects +else: + from .externals.loky import wrap_non_picklable_objects # noqa diff --git a/joblib/_multiprocessing_helpers.py b/joblib/_multiprocessing_helpers.py index 1c5de2f..bde4bc1 100644 --- a/joblib/_multiprocessing_helpers.py +++ b/joblib/_multiprocessing_helpers.py @@ -14,6 +14,7 @@ mp = int(os.environ.get('JOBLIB_MULTIPROCESSING', 1)) or None if mp: try: import multiprocessing as mp + import _multiprocessing # noqa except ImportError: mp = None diff --git a/joblib/parallel.py b/joblib/parallel.py index 687557e..9eb6308 100644 --- a/joblib/parallel.py +++ b/joblib/parallel.py @@ -27,7 +27,6 @@ from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend, ThreadingBackend, SequentialBackend, LokyBackend) from .externals.cloudpickle import dumps, loads -from .externals import loky # Make sure that those two classes are part of the public joblib.parallel API # so that 3rd party backend implementers can import them from here. @@ -36,15 +35,28 @@ from ._parallel_backends import ParallelBackendBase # noqa BACKENDS = { - 'multiprocessing': MultiprocessingBackend, 'threading': ThreadingBackend, 'sequential': SequentialBackend, - 'loky': LokyBackend, } # name of the backend used by default by Parallel outside of any context # managed by ``parallel_backend``. -DEFAULT_BACKEND = 'loky' + +# threading is the only backend that is always everywhere +DEFAULT_BACKEND = 'threading' + DEFAULT_N_JOBS = 1 + +MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'} + +# if multiprocessing is available, so is loky, we set it as the default +# backend +if mp is not None: + BACKENDS['multiprocessing'] = MultiprocessingBackend + from .externals import loky + BACKENDS['loky'] = LokyBackend + DEFAULT_BACKEND = 'loky' + + DEFAULT_THREAD_BACKEND = 'threading' # Thread local value that can be overridden by the ``parallel_backend`` context @@ -135,7 +147,9 @@ class parallel_backend(object): 'threading' is a low-overhead alternative that is most efficient for functions that release the Global Interpreter Lock: e.g. I/O-bound code or CPU-bound code in a few calls to native code that explicitly releases the - GIL. + GIL. Note that on some rare systems (such as pyiodine), + multiprocessing and loky may not be available, in which case joblib + defaults to threading. In addition, if the `dask` and `distributed` Python packages are installed, it is possible to use the 'dask' backend for better scheduling of nested @@ -184,9 +198,20 @@ class parallel_backend(object): def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params): if isinstance(backend, str): - if backend not in BACKENDS and backend in EXTERNAL_BACKENDS: - register = EXTERNAL_BACKENDS[backend] - register() + if backend not in BACKENDS: + if backend in EXTERNAL_BACKENDS: + register = EXTERNAL_BACKENDS[backend] + register() + elif backend in MAYBE_AVAILABLE_BACKENDS: + warnings.warn( + f"joblib backend '{backend}' is not available on " + f"your system, falling back to {DEFAULT_BACKEND}.", + UserWarning, + stacklevel=2) + BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] + else: + raise ValueError("Invalid backend: %s, expected one of %r" + % (backend, sorted(BACKENDS.keys()))) backend = BACKENDS[backend](**backend_params) @@ -436,7 +461,9 @@ class Parallel(Logger): - "loky" used by default, can induce some communication and memory overhead when exchanging input and - output data with the worker Python processes. + output data with the worker Python processes. On some rare + systems (such as Pyiodide), the loky backend may not be + available. - "multiprocessing" previous process-based backend based on `multiprocessing.Pool`. Less robust than `loky`. - "threading" is a very low-overhead backend but it suffers @@ -690,6 +717,16 @@ class Parallel(Logger): # preload modules on the forkserver helper process. self._backend_args['context'] = backend backend = MultiprocessingBackend(nesting_level=nesting_level) + + elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS: + warnings.warn( + f"joblib backend '{backend}' is not available on " + f"your system, falling back to {DEFAULT_BACKEND}.", + UserWarning, + stacklevel=2) + BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] + backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level) + else: try: backend_factory = BACKENDS[backend] diff --git a/joblib/test/test_cloudpickle_wrapper.py b/joblib/test/test_cloudpickle_wrapper.py new file mode 100644 index 0000000..733f51c --- /dev/null +++ b/joblib/test/test_cloudpickle_wrapper.py @@ -0,0 +1,27 @@ +""" +Test that our implementation of wrap_non_picklable_objects mimics +properly the loky implementation. +""" + +from .._cloudpickle_wrapper import wrap_non_picklable_objects +from .._cloudpickle_wrapper import my_wrap_non_picklable_objects + + +def a_function(x): + return x + + +class AClass(object): + + def __call__(self, x): + return x + + +def test_wrap_non_picklable_objects(): + # Mostly a smoke test: test that we can use callable in the same way + # with both our implementation of wrap_non_picklable_objects and the + # upstream one + for obj in (a_function, AClass()): + wrapped_obj = wrap_non_picklable_objects(obj) + my_wrapped_obj = my_wrap_non_picklable_objects(obj) + assert wrapped_obj(1) == my_wrapped_obj(1) diff --git a/joblib/test/test_memmapping.py b/joblib/test/test_memmapping.py index dc40d23..67ddaef 100644 --- a/joblib/test/test_memmapping.py +++ b/joblib/test/test_memmapping.py @@ -146,7 +146,8 @@ def test_memmap_based_array_reducing(tmpdir): assert_array_equal(b3_reconstructed, b3) -@skipif(sys.platform != "win32", +@with_multiprocessing +@skipif((sys.platform != "win32") or (), reason="PermissionError only easily triggerable on Windows") def test_resource_tracker_retries_when_permissionerror(tmpdir): # Test resource_tracker retry mechanism when unlinking memmaps. See more @@ -355,6 +356,7 @@ def test_pool_with_memmap_array_view(factory, tmpdir): @with_numpy +@with_multiprocessing @parametrize("backend", ["multiprocessing", "loky"]) def test_permission_error_windows_reference_cycle(backend): # Non regression test for: @@ -389,6 +391,7 @@ def test_permission_error_windows_reference_cycle(backend): @with_numpy +@with_multiprocessing @parametrize("backend", ["multiprocessing", "loky"]) def test_permission_error_windows_memmap_sent_to_parent(backend): # Second non-regression test for: diff --git a/joblib/test/test_missing_multiprocessing.py b/joblib/test/test_missing_multiprocessing.py new file mode 100644 index 0000000..251925c --- /dev/null +++ b/joblib/test/test_missing_multiprocessing.py @@ -0,0 +1,32 @@ +""" +Pyodide and other single-threaded Python builds will be missing the +_multiprocessing module. Test that joblib still works in this environment. +""" + +import os +import subprocess +import sys + + +def test_missing_multiprocessing(tmp_path): + """ + Test that import joblib works even if _multiprocessing is missing. + + pytest has already imported everything from joblib. The most reasonable way + to test importing joblib with modified environment is to invoke a separate + Python process. This also ensures that we don't break other tests by + importing a bad `_multiprocessing` module. + """ + (tmp_path / "_multiprocessing.py").write_text( + 'raise ImportError("No _multiprocessing module!")' + ) + env = dict(os.environ) + # For subprocess, use current sys.path with our custom version of + # multiprocessing inserted. + env["PYTHONPATH"] = ":".join([str(tmp_path)] + sys.path) + subprocess.check_call( + [sys.executable, "-c", + "import joblib, math; " + "joblib.Parallel(n_jobs=1)(" + "joblib.delayed(math.sqrt)(i**2) for i in range(10))" + ], env=env) diff --git a/joblib/test/test_module.py b/joblib/test/test_module.py index 9c3b12b..a2257a4 100644 --- a/joblib/test/test_module.py +++ b/joblib/test/test_module.py @@ -1,7 +1,7 @@ import sys import joblib -import pytest from joblib.testing import check_subprocess_call +from joblib.test.common import with_multiprocessing def test_version(): @@ -9,6 +9,7 @@ def test_version(): "There are no __version__ argument on the joblib module") +@with_multiprocessing def test_no_start_method_side_effect_on_import(): # check that importing joblib does not implicitly set the global # start_method for multiprocessing. @@ -22,6 +23,7 @@ def test_no_start_method_side_effect_on_import(): check_subprocess_call([sys.executable, '-c', code]) +@with_multiprocessing def test_no_semaphore_tracker_on_import(): # check that importing joblib does not implicitly spawn a resource tracker # or a semaphore tracker @@ -38,6 +40,7 @@ def test_no_semaphore_tracker_on_import(): check_subprocess_call([sys.executable, '-c', code]) +@with_multiprocessing def test_no_resource_tracker_on_import(): code = """if True: import joblib diff --git a/joblib/test/test_parallel.py b/joblib/test/test_parallel.py index 7edeb85..4c321e5 100644 --- a/joblib/test/test_parallel.py +++ b/joblib/test/test_parallel.py @@ -24,14 +24,17 @@ from importlib import reload import joblib from joblib import parallel from joblib import dump, load -from joblib.externals.loky import get_reusable_executor + +from joblib._multiprocessing_helpers import mp from joblib.test.common import np, with_numpy from joblib.test.common import with_multiprocessing from joblib.testing import (parametrize, raises, check_subprocess_call, skipif, SkipTest, warns) -from joblib.externals.loky.process_executor import TerminatedWorkerError +if mp is not None: + # Loky is not available if multiprocessing is not + from joblib.externals.loky import get_reusable_executor from queue import Queue @@ -69,7 +72,10 @@ from joblib.my_exceptions import WorkerInterrupt ALL_VALID_BACKENDS = [None] + sorted(BACKENDS.keys()) # Add instances of backend classes deriving from ParallelBackendBase ALL_VALID_BACKENDS += [BACKENDS[backend_str]() for backend_str in BACKENDS] -PROCESS_BACKENDS = ['multiprocessing', 'loky'] +if mp is None: + PROCESS_BACKENDS = [] +else: + PROCESS_BACKENDS = ['multiprocessing', 'loky'] PARALLEL_BACKENDS = PROCESS_BACKENDS + ['threading'] if hasattr(mp, 'get_context'): @@ -269,6 +275,7 @@ def raise_exception(backend): raise ValueError +@with_multiprocessing def test_nested_loop_with_exception_with_loky(): with raises(ValueError): with Parallel(n_jobs=2, backend="loky") as parallel: @@ -568,8 +575,14 @@ class FakeParallelBackend(SequentialBackend): def test_invalid_backend(): - with raises(ValueError): + with raises(ValueError) as excinfo: Parallel(backend='unit-testing') + assert "Invalid backend:" in str(excinfo.value) + + with raises(ValueError) as excinfo: + with parallel_backend('unit-testing'): + pass + assert "Invalid backend:" in str(excinfo.value) @parametrize('backend', ALL_VALID_BACKENDS) @@ -600,6 +613,17 @@ def test_overwrite_default_backend(): assert _active_backend_type() == DefaultBackend +@skipif(mp is not None, reason="Only without multiprocessing") +def test_backend_no_multiprocessing(): + with warns(UserWarning, + match="joblib backend '.*' is not available on.*"): + Parallel(backend='loky')(delayed(square)(i) for i in range(3)) + + # The below should now work without problems + with parallel_backend('loky'): + Parallel()(delayed(square)(i) for i in range(3)) + + def check_backend_context_manager(backend_name): with parallel_backend(backend_name, n_jobs=3): active_backend, active_n_jobs = parallel.get_active_backend() @@ -1207,7 +1231,10 @@ def test_memmapping_leaks(backend, tmpdir): raise AssertionError('temporary directory of Parallel was not removed') -@parametrize('backend', [None, 'loky', 'threading']) +@parametrize('backend', + ([None, 'threading'] if mp is None + else [None, 'loky', 'threading']) + ) def test_lambda_expression(backend): # cloudpickle is used to pickle delayed callables results = Parallel(n_jobs=2, backend=backend)( @@ -1237,6 +1264,7 @@ def test_backend_batch_statistics_reset(backend): p._backend._DEFAULT_SMOOTHED_BATCH_DURATION) +@with_multiprocessing def test_backend_hinting_and_constraints(): for n_jobs in [1, 2, -1]: assert type(Parallel(n_jobs=n_jobs)._backend) == LokyBackend @@ -1347,12 +1375,13 @@ def test_invalid_backend_hinting_and_constraints(): # requiring shared memory semantics. Parallel(prefer='processes', require='sharedmem') - # It is inconsistent to ask explicitly for a process-based parallelism - # while requiring shared memory semantics. - with raises(ValueError): - Parallel(backend='loky', require='sharedmem') - with raises(ValueError): - Parallel(backend='multiprocessing', require='sharedmem') + if mp is not None: + # It is inconsistent to ask explicitly for a process-based + # parallelism while requiring shared memory semantics. + with raises(ValueError): + Parallel(backend='loky', require='sharedmem') + with raises(ValueError): + Parallel(backend='multiprocessing', require='sharedmem') def test_global_parallel_backend(): @@ -1437,7 +1466,8 @@ def _recursive_parallel(nesting_limit=None): return Parallel()(delayed(_recursive_parallel)() for i in range(2)) -@parametrize('backend', ['loky', 'threading']) +@parametrize('backend', + (['threading'] if mp is None else ['loky', 'threading'])) def test_thread_bomb_mitigation(backend): # Test that recursive parallelism raises a recursion rather than # saturating the operating system resources by creating a unbounded number @@ -1446,13 +1476,18 @@ def test_thread_bomb_mitigation(backend): with raises(BaseException) as excinfo: _recursive_parallel() exc = excinfo.value - if backend == "loky" and isinstance(exc, TerminatedWorkerError): - # The recursion exception can itself cause an error when pickling it to - # be send back to the parent process. In this case the worker crashes - # but the original traceback is still printed on stderr. This could be - # improved but does not seem simple to do and this is is not critical - # for users (as long as there is no process or thread bomb happening). - pytest.xfail("Loky worker crash when serializing RecursionError") + if backend == "loky": + # Local import because loky may not be importable for lack of + # multiprocessing + from joblib.externals.loky.process_executor import TerminatedWorkerError # noqa + if isinstance(exc, TerminatedWorkerError): + # The recursion exception can itself cause an error when + # pickling it to be send back to the parent process. In this + # case the worker crashes but the original traceback is still + # printed on stderr. This could be improved but does not seem + # simple to do and this is is not critical for users (as long + # as there is no process or thread bomb happening). + pytest.xfail("Loky worker crash when serializing RecursionError") else: assert isinstance(exc, RecursionError) @@ -1466,7 +1501,7 @@ def _run_parallel_sum(): return env_vars, parallel_sum(100) -@parametrize("backend", [None, 'loky']) +@parametrize("backend", ([None, 'loky'] if mp is not None else [None])) @skipif(parallel_sum is None, reason="Need OpenMP helper compiled") def test_parallel_thread_limit(backend): results = Parallel(n_jobs=2, backend=backend)( -- 2.25.1