Implement IOLoop.run_in_executor (#2067)

This commit is contained in:
Mike DePalatis 2017-10-21 14:04:57 -04:00 committed by Ben Darnell
parent 37081d7928
commit 648b3e9f7c
2 changed files with 87 additions and 1 deletions

View File

@ -55,6 +55,10 @@ try:
except ImportError: except ImportError:
signal = None signal = None
try:
from concurrent.futures import ThreadPoolExecutor
except ImportError:
ThreadPoolExecutor = None
if PY3: if PY3:
import _thread as thread import _thread as thread
@ -635,6 +639,29 @@ class IOLoop(Configurable):
future.add_done_callback( future.add_done_callback(
lambda future: self.add_callback(callback, future)) lambda future: self.add_callback(callback, future))
def run_in_executor(self, executor, func, *args):
"""Runs a function in a ``concurrent.futures.Executor``. If
``executor`` is ``None``, the IO loop's default executor will be used.
Use `functools.partial` to pass keyword arguments to `func`.
"""
if ThreadPoolExecutor is None:
raise RuntimeError(
"concurrent.futures is required to use IOLoop.run_in_executor")
if executor is None:
if not hasattr(self, '_executor'):
from tornado.process import cpu_count
self._executor = ThreadPoolExecutor(max_workers=(cpu_count() * 5))
executor = self._executor
return executor.submit(func, *args)
def set_default_executor(self, executor):
"""Sets the default executor to use with :meth:`run_in_executor`."""
self._executor = executor
def _run_callback(self, callback): def _run_callback(self, callback):
"""Runs a callback with error handling. """Runs a callback with error handling.
@ -777,6 +804,8 @@ class PollIOLoop(IOLoop):
self._impl.close() self._impl.close()
self._callbacks = None self._callbacks = None
self._timeouts = None self._timeouts = None
if hasattr(self, '_executor'):
self._executor.shutdown()
def add_handler(self, fd, handler, events): def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd) fd, obj = self.split_fd(fd)

View File

@ -18,8 +18,9 @@ from tornado.ioloop import IOLoop, TimeoutError, PollIOLoop, PeriodicCallback
from tornado.log import app_log from tornado.log import app_log
from tornado.platform.select import _Select from tornado.platform.select import _Select
from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext
from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog, gen_test
from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis, skipBefore35, exec_test from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis, skipBefore35, exec_test
from tornado.concurrent import Future
try: try:
from concurrent import futures from concurrent import futures
@ -598,6 +599,62 @@ class TestIOLoopFutures(AsyncTestCase):
self.assertEqual(self.exception.args[0], "callback") self.assertEqual(self.exception.args[0], "callback")
self.assertEqual(self.future.exception().args[0], "worker") self.assertEqual(self.future.exception().args[0], "worker")
@gen_test
def test_run_in_executor_gen(self):
event1 = threading.Event()
event2 = threading.Event()
def callback(self_event, other_event):
self_event.set()
time.sleep(0.01)
self.assertTrue(other_event.is_set())
return self_event
res = yield [
IOLoop.current().run_in_executor(None, callback, event1, event2),
IOLoop.current().run_in_executor(None, callback, event2, event1)
]
self.assertEqual([event1, event2], res)
@skipBefore35
def test_run_in_executor_native(self):
event1 = threading.Event()
event2 = threading.Event()
def callback(self_event, other_event):
self_event.set()
time.sleep(0.01)
self.assertTrue(other_event.is_set())
other_event.wait()
return self_event
namespace = exec_test(globals(), locals(), """
async def main():
res = await gen.multi([
IOLoop.current().run_in_executor(None, callback, event1, event2),
IOLoop.current().run_in_executor(None, callback, event2, event1)
])
self.assertEqual([event1, event2], res)
""")
IOLoop.current().run_sync(namespace['main'])
def test_set_default_executor(self):
class MyExecutor(futures.Executor):
def submit(self, func, *args):
return Future()
event = threading.Event()
def future_func():
event.set()
executor = MyExecutor()
loop = IOLoop.current()
loop.set_default_executor(executor)
loop.run_in_executor(None, future_func)
loop.add_timeout(0.01, lambda: self.assertFalse(event.is_set()))
class TestIOLoopRunSync(unittest.TestCase): class TestIOLoopRunSync(unittest.TestCase):
def setUp(self): def setUp(self):