diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2af31a106dd..ad6b4c20b56 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -80,7 +80,14 @@ def _worker(executor_reference, work_queue, initializer, initargs): work_item.run() # Delete references to object. See issue16284 del work_item + + # attempt to increment idle count + executor = executor_reference() + if executor is not None: + executor._idle_semaphore.release() + del executor continue + executor = executor_reference() # Exit if: # - The interpreter is shutting down OR @@ -133,6 +140,7 @@ def __init__(self, max_workers=None, thread_name_prefix='', self._max_workers = max_workers self._work_queue = queue.SimpleQueue() + self._idle_semaphore = threading.Semaphore(0) self._threads = set() self._broken = False self._shutdown = False @@ -178,12 +186,15 @@ def submit(*args, **kwargs): submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): + # if idle threads are available, don't spin new threads + if self._idle_semaphore.acquire(timeout=0): + return + # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. + num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = '%s_%d' % (self._thread_name_prefix or self, diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 212ccd8d532..de6ad8f2aa1 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -346,10 +346,15 @@ def _prime_executor(self): pass def test_threads_terminate(self): - self.executor.submit(mul, 21, 2) - self.executor.submit(mul, 6, 7) - self.executor.submit(mul, 3, 14) + def acquire_lock(lock): + lock.acquire() + + sem = threading.Semaphore(0) + for i in range(3): + self.executor.submit(acquire_lock, sem) self.assertEqual(len(self.executor._threads), 3) + for i in range(3): + sem.release() self.executor.shutdown() for t in self.executor._threads: t.join() @@ -753,6 +758,27 @@ def test_default_workers(self): self.assertEqual(executor._max_workers, (os.cpu_count() or 1) * 5) + def test_saturation(self): + executor = self.executor_type(4) + def acquire_lock(lock): + lock.acquire() + + sem = threading.Semaphore(0) + for i in range(15 * executor._max_workers): + executor.submit(acquire_lock, sem) + self.assertEqual(len(executor._threads), executor._max_workers) + for i in range(15 * executor._max_workers): + sem.release() + executor.shutdown(wait=True) + + def test_idle_thread_reuse(self): + executor = self.executor_type() + executor.submit(mul, 21, 2).result() + executor.submit(mul, 6, 7).result() + executor.submit(mul, 3, 14).result() + self.assertEqual(len(executor._threads), 1) + executor.shutdown(wait=True) + class ProcessPoolExecutorTest(ExecutorTest): diff --git a/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst b/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst new file mode 100644 index 00000000000..8c418824a99 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst @@ -0,0 +1 @@ +Change ThreadPoolExecutor to use existing idle threads before spinning up new ones. \ No newline at end of file