pyodide/packages/joblib/patches/0001-No-multiprocessing-125...

502 lines
19 KiB
Diff

From b285003a08d44ff85327367f37a61489a53217a3 Mon Sep 17 00:00:00 2001
From: Gael Varoquaux <gael.varoquaux@normalesup.org>
Date: Tue, 8 Feb 2022 09:20:22 +0100
Subject: [PATCH] No multiprocessing (#1256)
Co-authored-by: Hood Chatham <roberthoodchatham@gmail.com>
Co-authored-by: Olivier Grisel <olivier.grisel@ensta.org>
---
doc/parallel.rst | 7 ++
joblib/__init__.py | 3 +-
joblib/_cloudpickle_wrapper.py | 17 +++++
joblib/_multiprocessing_helpers.py | 1 +
joblib/parallel.py | 55 ++++++++++++---
joblib/test/test_cloudpickle_wrapper.py | 27 ++++++++
joblib/test/test_memmapping.py | 5 +-
joblib/test/test_missing_multiprocessing.py | 32 +++++++++
joblib/test/test_module.py | 5 +-
joblib/test/test_parallel.py | 75 +++++++++++++++------
10 files changed, 194 insertions(+), 33 deletions(-)
create mode 100644 joblib/_cloudpickle_wrapper.py
create mode 100644 joblib/test/test_cloudpickle_wrapper.py
create mode 100644 joblib/test/test_missing_multiprocessing.py
diff --git a/doc/parallel.rst b/doc/parallel.rst
index 466d613..8514e2d 100644
--- a/doc/parallel.rst
+++ b/doc/parallel.rst
@@ -71,6 +71,13 @@ call to :class:`joblib.Parallel` but this is now considered a bad pattern
(when done in a library) as it does not make it possible to override that
choice with the ``parallel_backend`` context manager.
+
+.. topic:: The loky backend may not always be available
+
+ Some rare systems do not support multiprocessing (for instance
+ Pyodide). In this case the loky backend is not available and the
+ default backend falls back to threading.
+
Besides builtin joblib backends, we can use
`Joblib Apache Spark Backend <https://github.com/joblib/joblib-spark>`_
to distribute joblib tasks on a Spark cluster.
diff --git a/joblib/__init__.py b/joblib/__init__.py
index 4255c86..9863998 100644
--- a/joblib/__init__.py
+++ b/joblib/__init__.py
@@ -123,8 +123,7 @@ from .parallel import cpu_count
from .parallel import register_parallel_backend
from .parallel import parallel_backend
from .parallel import effective_n_jobs
-
-from .externals.loky import wrap_non_picklable_objects
+from ._cloudpickle_wrapper import wrap_non_picklable_objects
__all__ = ['Memory', 'MemorizedResult', 'PrintTime', 'Logger', 'hash', 'dump',
diff --git a/joblib/_cloudpickle_wrapper.py b/joblib/_cloudpickle_wrapper.py
new file mode 100644
index 0000000..3dbe3ae
--- /dev/null
+++ b/joblib/_cloudpickle_wrapper.py
@@ -0,0 +1,17 @@
+"""
+Small shim of loky's cloudpickle_wrapper to avoid failure when
+multiprocessing is not available.
+"""
+
+
+from ._multiprocessing_helpers import mp
+
+
+def my_wrap_non_picklable_objects(obj, keep_wrapper=True):
+ return obj
+
+
+if mp is None:
+ wrap_non_picklable_objects = my_wrap_non_picklable_objects
+else:
+ from .externals.loky import wrap_non_picklable_objects # noqa
diff --git a/joblib/_multiprocessing_helpers.py b/joblib/_multiprocessing_helpers.py
index 1c5de2f..bde4bc1 100644
--- a/joblib/_multiprocessing_helpers.py
+++ b/joblib/_multiprocessing_helpers.py
@@ -14,6 +14,7 @@ mp = int(os.environ.get('JOBLIB_MULTIPROCESSING', 1)) or None
if mp:
try:
import multiprocessing as mp
+ import _multiprocessing # noqa
except ImportError:
mp = None
diff --git a/joblib/parallel.py b/joblib/parallel.py
index 687557e..9eb6308 100644
--- a/joblib/parallel.py
+++ b/joblib/parallel.py
@@ -27,7 +27,6 @@ from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend,
ThreadingBackend, SequentialBackend,
LokyBackend)
from .externals.cloudpickle import dumps, loads
-from .externals import loky
# Make sure that those two classes are part of the public joblib.parallel API
# so that 3rd party backend implementers can import them from here.
@@ -36,15 +35,28 @@ from ._parallel_backends import ParallelBackendBase # noqa
BACKENDS = {
- 'multiprocessing': MultiprocessingBackend,
'threading': ThreadingBackend,
'sequential': SequentialBackend,
- 'loky': LokyBackend,
}
# name of the backend used by default by Parallel outside of any context
# managed by ``parallel_backend``.
-DEFAULT_BACKEND = 'loky'
+
+# threading is the only backend that is always everywhere
+DEFAULT_BACKEND = 'threading'
+
DEFAULT_N_JOBS = 1
+
+MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'}
+
+# if multiprocessing is available, so is loky, we set it as the default
+# backend
+if mp is not None:
+ BACKENDS['multiprocessing'] = MultiprocessingBackend
+ from .externals import loky
+ BACKENDS['loky'] = LokyBackend
+ DEFAULT_BACKEND = 'loky'
+
+
DEFAULT_THREAD_BACKEND = 'threading'
# Thread local value that can be overridden by the ``parallel_backend`` context
@@ -135,7 +147,9 @@ class parallel_backend(object):
'threading' is a low-overhead alternative that is most efficient for
functions that release the Global Interpreter Lock: e.g. I/O-bound code or
CPU-bound code in a few calls to native code that explicitly releases the
- GIL.
+ GIL. Note that on some rare systems (such as pyiodine),
+ multiprocessing and loky may not be available, in which case joblib
+ defaults to threading.
In addition, if the `dask` and `distributed` Python packages are installed,
it is possible to use the 'dask' backend for better scheduling of nested
@@ -184,9 +198,20 @@ class parallel_backend(object):
def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
**backend_params):
if isinstance(backend, str):
- if backend not in BACKENDS and backend in EXTERNAL_BACKENDS:
- register = EXTERNAL_BACKENDS[backend]
- register()
+ if backend not in BACKENDS:
+ if backend in EXTERNAL_BACKENDS:
+ register = EXTERNAL_BACKENDS[backend]
+ register()
+ elif backend in MAYBE_AVAILABLE_BACKENDS:
+ warnings.warn(
+ f"joblib backend '{backend}' is not available on "
+ f"your system, falling back to {DEFAULT_BACKEND}.",
+ UserWarning,
+ stacklevel=2)
+ BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
+ else:
+ raise ValueError("Invalid backend: %s, expected one of %r"
+ % (backend, sorted(BACKENDS.keys())))
backend = BACKENDS[backend](**backend_params)
@@ -436,7 +461,9 @@ class Parallel(Logger):
- "loky" used by default, can induce some
communication and memory overhead when exchanging input and
- output data with the worker Python processes.
+ output data with the worker Python processes. On some rare
+ systems (such as Pyiodide), the loky backend may not be
+ available.
- "multiprocessing" previous process-based backend based on
`multiprocessing.Pool`. Less robust than `loky`.
- "threading" is a very low-overhead backend but it suffers
@@ -690,6 +717,16 @@ class Parallel(Logger):
# preload modules on the forkserver helper process.
self._backend_args['context'] = backend
backend = MultiprocessingBackend(nesting_level=nesting_level)
+
+ elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
+ warnings.warn(
+ f"joblib backend '{backend}' is not available on "
+ f"your system, falling back to {DEFAULT_BACKEND}.",
+ UserWarning,
+ stacklevel=2)
+ BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
+ backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level)
+
else:
try:
backend_factory = BACKENDS[backend]
diff --git a/joblib/test/test_cloudpickle_wrapper.py b/joblib/test/test_cloudpickle_wrapper.py
new file mode 100644
index 0000000..733f51c
--- /dev/null
+++ b/joblib/test/test_cloudpickle_wrapper.py
@@ -0,0 +1,27 @@
+"""
+Test that our implementation of wrap_non_picklable_objects mimics
+properly the loky implementation.
+"""
+
+from .._cloudpickle_wrapper import wrap_non_picklable_objects
+from .._cloudpickle_wrapper import my_wrap_non_picklable_objects
+
+
+def a_function(x):
+ return x
+
+
+class AClass(object):
+
+ def __call__(self, x):
+ return x
+
+
+def test_wrap_non_picklable_objects():
+ # Mostly a smoke test: test that we can use callable in the same way
+ # with both our implementation of wrap_non_picklable_objects and the
+ # upstream one
+ for obj in (a_function, AClass()):
+ wrapped_obj = wrap_non_picklable_objects(obj)
+ my_wrapped_obj = my_wrap_non_picklable_objects(obj)
+ assert wrapped_obj(1) == my_wrapped_obj(1)
diff --git a/joblib/test/test_memmapping.py b/joblib/test/test_memmapping.py
index dc40d23..67ddaef 100644
--- a/joblib/test/test_memmapping.py
+++ b/joblib/test/test_memmapping.py
@@ -146,7 +146,8 @@ def test_memmap_based_array_reducing(tmpdir):
assert_array_equal(b3_reconstructed, b3)
-@skipif(sys.platform != "win32",
+@with_multiprocessing
+@skipif((sys.platform != "win32") or (),
reason="PermissionError only easily triggerable on Windows")
def test_resource_tracker_retries_when_permissionerror(tmpdir):
# Test resource_tracker retry mechanism when unlinking memmaps. See more
@@ -355,6 +356,7 @@ def test_pool_with_memmap_array_view(factory, tmpdir):
@with_numpy
+@with_multiprocessing
@parametrize("backend", ["multiprocessing", "loky"])
def test_permission_error_windows_reference_cycle(backend):
# Non regression test for:
@@ -389,6 +391,7 @@ def test_permission_error_windows_reference_cycle(backend):
@with_numpy
+@with_multiprocessing
@parametrize("backend", ["multiprocessing", "loky"])
def test_permission_error_windows_memmap_sent_to_parent(backend):
# Second non-regression test for:
diff --git a/joblib/test/test_missing_multiprocessing.py b/joblib/test/test_missing_multiprocessing.py
new file mode 100644
index 0000000..251925c
--- /dev/null
+++ b/joblib/test/test_missing_multiprocessing.py
@@ -0,0 +1,32 @@
+"""
+Pyodide and other single-threaded Python builds will be missing the
+_multiprocessing module. Test that joblib still works in this environment.
+"""
+
+import os
+import subprocess
+import sys
+
+
+def test_missing_multiprocessing(tmp_path):
+ """
+ Test that import joblib works even if _multiprocessing is missing.
+
+ pytest has already imported everything from joblib. The most reasonable way
+ to test importing joblib with modified environment is to invoke a separate
+ Python process. This also ensures that we don't break other tests by
+ importing a bad `_multiprocessing` module.
+ """
+ (tmp_path / "_multiprocessing.py").write_text(
+ 'raise ImportError("No _multiprocessing module!")'
+ )
+ env = dict(os.environ)
+ # For subprocess, use current sys.path with our custom version of
+ # multiprocessing inserted.
+ env["PYTHONPATH"] = ":".join([str(tmp_path)] + sys.path)
+ subprocess.check_call(
+ [sys.executable, "-c",
+ "import joblib, math; "
+ "joblib.Parallel(n_jobs=1)("
+ "joblib.delayed(math.sqrt)(i**2) for i in range(10))"
+ ], env=env)
diff --git a/joblib/test/test_module.py b/joblib/test/test_module.py
index 9c3b12b..a2257a4 100644
--- a/joblib/test/test_module.py
+++ b/joblib/test/test_module.py
@@ -1,7 +1,7 @@
import sys
import joblib
-import pytest
from joblib.testing import check_subprocess_call
+from joblib.test.common import with_multiprocessing
def test_version():
@@ -9,6 +9,7 @@ def test_version():
"There are no __version__ argument on the joblib module")
+@with_multiprocessing
def test_no_start_method_side_effect_on_import():
# check that importing joblib does not implicitly set the global
# start_method for multiprocessing.
@@ -22,6 +23,7 @@ def test_no_start_method_side_effect_on_import():
check_subprocess_call([sys.executable, '-c', code])
+@with_multiprocessing
def test_no_semaphore_tracker_on_import():
# check that importing joblib does not implicitly spawn a resource tracker
# or a semaphore tracker
@@ -38,6 +40,7 @@ def test_no_semaphore_tracker_on_import():
check_subprocess_call([sys.executable, '-c', code])
+@with_multiprocessing
def test_no_resource_tracker_on_import():
code = """if True:
import joblib
diff --git a/joblib/test/test_parallel.py b/joblib/test/test_parallel.py
index 7edeb85..4c321e5 100644
--- a/joblib/test/test_parallel.py
+++ b/joblib/test/test_parallel.py
@@ -24,14 +24,17 @@ from importlib import reload
import joblib
from joblib import parallel
from joblib import dump, load
-from joblib.externals.loky import get_reusable_executor
+
+from joblib._multiprocessing_helpers import mp
from joblib.test.common import np, with_numpy
from joblib.test.common import with_multiprocessing
from joblib.testing import (parametrize, raises, check_subprocess_call,
skipif, SkipTest, warns)
-from joblib.externals.loky.process_executor import TerminatedWorkerError
+if mp is not None:
+ # Loky is not available if multiprocessing is not
+ from joblib.externals.loky import get_reusable_executor
from queue import Queue
@@ -69,7 +72,10 @@ from joblib.my_exceptions import WorkerInterrupt
ALL_VALID_BACKENDS = [None] + sorted(BACKENDS.keys())
# Add instances of backend classes deriving from ParallelBackendBase
ALL_VALID_BACKENDS += [BACKENDS[backend_str]() for backend_str in BACKENDS]
-PROCESS_BACKENDS = ['multiprocessing', 'loky']
+if mp is None:
+ PROCESS_BACKENDS = []
+else:
+ PROCESS_BACKENDS = ['multiprocessing', 'loky']
PARALLEL_BACKENDS = PROCESS_BACKENDS + ['threading']
if hasattr(mp, 'get_context'):
@@ -269,6 +275,7 @@ def raise_exception(backend):
raise ValueError
+@with_multiprocessing
def test_nested_loop_with_exception_with_loky():
with raises(ValueError):
with Parallel(n_jobs=2, backend="loky") as parallel:
@@ -568,8 +575,14 @@ class FakeParallelBackend(SequentialBackend):
def test_invalid_backend():
- with raises(ValueError):
+ with raises(ValueError) as excinfo:
Parallel(backend='unit-testing')
+ assert "Invalid backend:" in str(excinfo.value)
+
+ with raises(ValueError) as excinfo:
+ with parallel_backend('unit-testing'):
+ pass
+ assert "Invalid backend:" in str(excinfo.value)
@parametrize('backend', ALL_VALID_BACKENDS)
@@ -600,6 +613,17 @@ def test_overwrite_default_backend():
assert _active_backend_type() == DefaultBackend
+@skipif(mp is not None, reason="Only without multiprocessing")
+def test_backend_no_multiprocessing():
+ with warns(UserWarning,
+ match="joblib backend '.*' is not available on.*"):
+ Parallel(backend='loky')(delayed(square)(i) for i in range(3))
+
+ # The below should now work without problems
+ with parallel_backend('loky'):
+ Parallel()(delayed(square)(i) for i in range(3))
+
+
def check_backend_context_manager(backend_name):
with parallel_backend(backend_name, n_jobs=3):
active_backend, active_n_jobs = parallel.get_active_backend()
@@ -1207,7 +1231,10 @@ def test_memmapping_leaks(backend, tmpdir):
raise AssertionError('temporary directory of Parallel was not removed')
-@parametrize('backend', [None, 'loky', 'threading'])
+@parametrize('backend',
+ ([None, 'threading'] if mp is None
+ else [None, 'loky', 'threading'])
+ )
def test_lambda_expression(backend):
# cloudpickle is used to pickle delayed callables
results = Parallel(n_jobs=2, backend=backend)(
@@ -1237,6 +1264,7 @@ def test_backend_batch_statistics_reset(backend):
p._backend._DEFAULT_SMOOTHED_BATCH_DURATION)
+@with_multiprocessing
def test_backend_hinting_and_constraints():
for n_jobs in [1, 2, -1]:
assert type(Parallel(n_jobs=n_jobs)._backend) == LokyBackend
@@ -1347,12 +1375,13 @@ def test_invalid_backend_hinting_and_constraints():
# requiring shared memory semantics.
Parallel(prefer='processes', require='sharedmem')
- # It is inconsistent to ask explicitly for a process-based parallelism
- # while requiring shared memory semantics.
- with raises(ValueError):
- Parallel(backend='loky', require='sharedmem')
- with raises(ValueError):
- Parallel(backend='multiprocessing', require='sharedmem')
+ if mp is not None:
+ # It is inconsistent to ask explicitly for a process-based
+ # parallelism while requiring shared memory semantics.
+ with raises(ValueError):
+ Parallel(backend='loky', require='sharedmem')
+ with raises(ValueError):
+ Parallel(backend='multiprocessing', require='sharedmem')
def test_global_parallel_backend():
@@ -1437,7 +1466,8 @@ def _recursive_parallel(nesting_limit=None):
return Parallel()(delayed(_recursive_parallel)() for i in range(2))
-@parametrize('backend', ['loky', 'threading'])
+@parametrize('backend',
+ (['threading'] if mp is None else ['loky', 'threading']))
def test_thread_bomb_mitigation(backend):
# Test that recursive parallelism raises a recursion rather than
# saturating the operating system resources by creating a unbounded number
@@ -1446,13 +1476,18 @@ def test_thread_bomb_mitigation(backend):
with raises(BaseException) as excinfo:
_recursive_parallel()
exc = excinfo.value
- if backend == "loky" and isinstance(exc, TerminatedWorkerError):
- # The recursion exception can itself cause an error when pickling it to
- # be send back to the parent process. In this case the worker crashes
- # but the original traceback is still printed on stderr. This could be
- # improved but does not seem simple to do and this is is not critical
- # for users (as long as there is no process or thread bomb happening).
- pytest.xfail("Loky worker crash when serializing RecursionError")
+ if backend == "loky":
+ # Local import because loky may not be importable for lack of
+ # multiprocessing
+ from joblib.externals.loky.process_executor import TerminatedWorkerError # noqa
+ if isinstance(exc, TerminatedWorkerError):
+ # The recursion exception can itself cause an error when
+ # pickling it to be send back to the parent process. In this
+ # case the worker crashes but the original traceback is still
+ # printed on stderr. This could be improved but does not seem
+ # simple to do and this is is not critical for users (as long
+ # as there is no process or thread bomb happening).
+ pytest.xfail("Loky worker crash when serializing RecursionError")
else:
assert isinstance(exc, RecursionError)
@@ -1466,7 +1501,7 @@ def _run_parallel_sum():
return env_vars, parallel_sum(100)
-@parametrize("backend", [None, 'loky'])
+@parametrize("backend", ([None, 'loky'] if mp is not None else [None]))
@skipif(parallel_sum is None, reason="Need OpenMP helper compiled")
def test_parallel_thread_limit(backend):
results = Parallel(n_jobs=2, backend=backend)(
--
2.25.1