Ensure _run_defer() fully executes at least once before shutdown

Without this, it's possible for Waker to be start_received() after the
shutdown signal has already been sent, resulting in 5 second delay
during shutdown.

Additionally mask EBADF during os.write() to waker's write side.
Necessary since nothing synchronizes writer threads from the broker
thread during shutdown. Could be done with a lock instead, but this is
cheaper.
This commit is contained in:
David Wilson 2017-10-08 17:03:11 +05:30
parent 8c6d861f15
commit baf4380b6d
1 changed files with 8 additions and 4 deletions

View File

@ -744,15 +744,18 @@ class Waker(BasicStream):
Nothing is written if the current thread is the IO multiplexer thread. Nothing is written if the current thread is the IO multiplexer thread.
""" """
IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
if threading.currentThread() != self._broker._thread and \ if threading.currentThread() != self._broker._thread:
self.transmit_side.fd: try:
os.write(self.transmit_side.fd, ' ') self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
def on_receive(self, broker): def on_receive(self, broker):
""" """
Read a byte from the self-pipe. Read a byte from the self-pipe.
""" """
os.read(self.receive_side.fd, 256) self.receive_side.read(256)
class IoLogger(BasicStream): class IoLogger(BasicStream):
@ -1003,6 +1006,7 @@ class Broker(object):
while self._alive: while self._alive:
self._loop_once() self._loop_once()
self._run_defer()
fire(self, 'shutdown') fire(self, 'shutdown')
for side in self._readers | self._writers: for side in self._readers | self._writers: