From 648b3e9f7c6abe718fd5cd95fc55834efdcfa020 Mon Sep 17 00:00:00 2001 From: Mike DePalatis Date: Sat, 21 Oct 2017 14:04:57 -0400 Subject: [PATCH] Implement IOLoop.run_in_executor (#2067) --- tornado/ioloop.py | 29 ++++++++++++++++++ tornado/test/ioloop_test.py | 59 ++++++++++++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/tornado/ioloop.py b/tornado/ioloop.py index 73d0cbdb..5686576c 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -55,6 +55,10 @@ try: except ImportError: signal = None +try: + from concurrent.futures import ThreadPoolExecutor +except ImportError: + ThreadPoolExecutor = None if PY3: import _thread as thread @@ -635,6 +639,29 @@ class IOLoop(Configurable): future.add_done_callback( lambda future: self.add_callback(callback, future)) + def run_in_executor(self, executor, func, *args): + """Runs a function in a ``concurrent.futures.Executor``. If + ``executor`` is ``None``, the IO loop's default executor will be used. + + Use `functools.partial` to pass keyword arguments to `func`. + + """ + if ThreadPoolExecutor is None: + raise RuntimeError( + "concurrent.futures is required to use IOLoop.run_in_executor") + + if executor is None: + if not hasattr(self, '_executor'): + from tornado.process import cpu_count + self._executor = ThreadPoolExecutor(max_workers=(cpu_count() * 5)) + executor = self._executor + + return executor.submit(func, *args) + + def set_default_executor(self, executor): + """Sets the default executor to use with :meth:`run_in_executor`.""" + self._executor = executor + def _run_callback(self, callback): """Runs a callback with error handling. @@ -777,6 +804,8 @@ class PollIOLoop(IOLoop): self._impl.close() self._callbacks = None self._timeouts = None + if hasattr(self, '_executor'): + self._executor.shutdown() def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) diff --git a/tornado/test/ioloop_test.py b/tornado/test/ioloop_test.py index 5b9bd9cc..f3cd32ae 100644 --- a/tornado/test/ioloop_test.py +++ b/tornado/test/ioloop_test.py @@ -18,8 +18,9 @@ from tornado.ioloop import IOLoop, TimeoutError, PollIOLoop, PeriodicCallback from tornado.log import app_log from tornado.platform.select import _Select from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext -from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog +from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog, gen_test from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis, skipBefore35, exec_test +from tornado.concurrent import Future try: from concurrent import futures @@ -598,6 +599,62 @@ class TestIOLoopFutures(AsyncTestCase): self.assertEqual(self.exception.args[0], "callback") self.assertEqual(self.future.exception().args[0], "worker") + @gen_test + def test_run_in_executor_gen(self): + event1 = threading.Event() + event2 = threading.Event() + + def callback(self_event, other_event): + self_event.set() + time.sleep(0.01) + self.assertTrue(other_event.is_set()) + return self_event + + res = yield [ + IOLoop.current().run_in_executor(None, callback, event1, event2), + IOLoop.current().run_in_executor(None, callback, event2, event1) + ] + + self.assertEqual([event1, event2], res) + + @skipBefore35 + def test_run_in_executor_native(self): + event1 = threading.Event() + event2 = threading.Event() + + def callback(self_event, other_event): + self_event.set() + time.sleep(0.01) + self.assertTrue(other_event.is_set()) + other_event.wait() + return self_event + + namespace = exec_test(globals(), locals(), """ + async def main(): + res = await gen.multi([ + IOLoop.current().run_in_executor(None, callback, event1, event2), + IOLoop.current().run_in_executor(None, callback, event2, event1) + ]) + self.assertEqual([event1, event2], res) + """) + IOLoop.current().run_sync(namespace['main']) + + def test_set_default_executor(self): + class MyExecutor(futures.Executor): + def submit(self, func, *args): + return Future() + + event = threading.Event() + + def future_func(): + event.set() + + executor = MyExecutor() + loop = IOLoop.current() + loop.set_default_executor(executor) + loop.run_in_executor(None, future_func) + loop.add_timeout(0.01, lambda: self.assertFalse(event.is_set())) + class TestIOLoopRunSync(unittest.TestCase): def setUp(self):