Merge pull request #1511 from jampp/master

Remove synchronization in add_callback from the ioloop thread
This commit is contained in:
Ben Darnell 2015-09-27 16:00:45 -04:00
commit 8216a5e599
1 changed files with 25 additions and 10 deletions

View File

@ -909,20 +909,35 @@ class PollIOLoop(IOLoop):
self._cancellations += 1 self._cancellations += 1
def add_callback(self, callback, *args, **kwargs): def add_callback(self, callback, *args, **kwargs):
if thread.get_ident() != self._thread_ident:
# If we're not on the IOLoop's thread, we need to synchronize
# with other threads, or waking logic will induce a race.
with self._callback_lock: with self._callback_lock:
if self._closing: if self._closing:
raise RuntimeError("IOLoop is closing") raise RuntimeError("IOLoop is closing")
list_empty = not self._callbacks list_empty = not self._callbacks
self._callbacks.append(functools.partial( self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs)) stack_context.wrap(callback), *args, **kwargs))
if list_empty and thread.get_ident() != self._thread_ident: if list_empty:
# If we're in the IOLoop's thread, we know it's not currently # If we're not in the IOLoop's thread, and we added the
# polling. If we're not, and we added the first callback to an # first callback to an empty list, we may need to wake it
# empty list, we may need to wake it up (it may wake up on its # up (it may wake up on its own, but an occasional extra
# own, but an occasional extra wake is harmless). Waking # wake is harmless). Waking up a polling IOLoop is
# up a polling IOLoop is relatively expensive, so we try to # relatively expensive, so we try to avoid it when we can.
# avoid it when we can.
self._waker.wake() self._waker.wake()
else:
if self._closing:
raise RuntimeError("IOLoop is closing")
# If we're on the IOLoop's thread, we don't need the lock,
# since we don't need to wake anyone, just add the callback.
# Blindly insert into self._callbacks.
# This is safe because the GIL makes list.append atomic.
# One subtlety is that if the thread is interrupting another
# thread holding the _callback_lock block in IOLoop.start,
# we may modify either the old or new version of self._callbacks,
# but either way will work.
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
def add_callback_from_signal(self, callback, *args, **kwargs): def add_callback_from_signal(self, callback, *args, **kwargs):
with stack_context.NullContext(): with stack_context.NullContext():