diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 2b3cc59a9ff..c0775418852 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -261,6 +261,10 @@ def _setup_queues(self): self._quick_put = self._inqueue._writer.send self._quick_get = self._outqueue._reader.recv + def _check_running(self): + if self._state != RUN: + raise ValueError("Pool not running") + def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. @@ -306,8 +310,7 @@ def imap(self, func, iterable, chunksize=1): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - if self._state != RUN: - raise ValueError("Pool not running") + self._check_running() if chunksize == 1: result = IMapIterator(self._cache) self._taskqueue.put( @@ -336,8 +339,7 @@ def imap_unordered(self, func, iterable, chunksize=1): ''' Like `imap()` method but ordering of results is arbitrary. ''' - if self._state != RUN: - raise ValueError("Pool not running") + self._check_running() if chunksize == 1: result = IMapUnorderedIterator(self._cache) self._taskqueue.put( @@ -366,8 +368,7 @@ def apply_async(self, func, args=(), kwds={}, callback=None, ''' Asynchronous version of `apply()` method. ''' - if self._state != RUN: - raise ValueError("Pool not running") + self._check_running() result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) return result @@ -385,8 +386,7 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, ''' Helper function to implement map, starmap and their async counterparts. ''' - if self._state != RUN: - raise ValueError("Pool not running") + self._check_running() if not hasattr(iterable, '__len__'): iterable = list(iterable) @@ -625,6 +625,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, p.join() def __enter__(self): + self._check_running() return self def __exit__(self, exc_type, exc_val, exc_tb): diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 0b0fe7c9b29..a2dc53c8e4e 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2561,6 +2561,22 @@ def test_release_task_refs(self): # they were released too. self.assertEqual(CountedObject.n_instances, 0) + def test_enter(self): + if self.TYPE == 'manager': + self.skipTest("test not applicable to manager") + + pool = self.Pool(1) + with pool: + pass + # call pool.terminate() + # pool is no longer running + + with self.assertRaises(ValueError): + # bpo-35477: pool.__enter__() fails if the pool is not running + with pool: + pass + pool.join() + def raising(): raise KeyError("key") diff --git a/Misc/NEWS.d/next/Library/2018-12-13-00-10-51.bpo-35477.hHyy06.rst b/Misc/NEWS.d/next/Library/2018-12-13-00-10-51.bpo-35477.hHyy06.rst new file mode 100644 index 00000000000..524df71ed95 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-12-13-00-10-51.bpo-35477.hHyy06.rst @@ -0,0 +1,2 @@ +:meth:`multiprocessing.Pool.__enter__` now fails if the pool is not running: +``with pool:`` fails if used more than once.