Ensure _run_defer() fully executes at least once before shutdown

Without this, it's possible for Waker to be start_received() after the
shutdown signal has already been sent, resulting in 5 second delay
during shutdown.

Additionally mask EBADF during os.write() to waker's write side.
Necessary since nothing synchronizes writer threads from the broker
thread during shutdown. Could be done with a lock instead, but this is
cheaper.
This commit is contained in:
David Wilson 2017-10-08 17:03:11 +05:30
parent 419c8c810f
commit 0481c08beb
1 changed files with 8 additions and 4 deletions

View File

@ -744,15 +744,18 @@ class Waker(BasicStream):
Nothing is written if the current thread is the IO multiplexer thread.
"""
IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
if threading.currentThread() != self._broker._thread and \
self.transmit_side.fd:
os.write(self.transmit_side.fd, ' ')
if threading.currentThread() != self._broker._thread:
try:
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
def on_receive(self, broker):
"""
Read a byte from the self-pipe.
"""
os.read(self.receive_side.fd, 256)
self.receive_side.read(256)
class IoLogger(BasicStream):
@ -1003,6 +1006,7 @@ class Broker(object):
while self._alive:
self._loop_once()
self._run_defer()
fire(self, 'shutdown')
for side in self._readers | self._writers: