issue #249: have upgrade_router() upgrade the poller too.

Now when a child becomes a parent, it gets a new poller suitable for
many more children than was possible using select().
This commit is contained in:
David Wilson 2018-05-15 09:28:03 +01:00
parent 55fff54774
commit 7d0209d8de
1 changed files with 37 additions and 14 deletions

View File

@ -422,8 +422,41 @@ def discard_until(fd, s, deadline):
return return
def _upgrade_broker(broker):
"""
Extract the poller state from Broker and replace it with the industrial
strength poller for this OS. Must run on the Broker thread.
"""
# This function is deadly! The act of calling start_receive() generates log
# messages which must be silenced as the upgrade progresses, otherwise the
# poller state will change as it is copied, resulting in write fds that are
# lost. (Due to LogHandler->Router->Stream->Broker->Poller, where Stream
# only calls start_transmit() when transitioning from empty to non-empty
# buffer. If the start_transmit() is lost, writes from the child hang
# permanently).
root = logging.getLogger()
old_level = root.level
root.setLevel(logging.CRITICAL)
old = broker.poller
new = PREFERRED_POLLER()
for fd, data in old.readers:
new.start_receive(fd, data)
for fd, data in old.writers:
new.start_transmit(fd, data)
old.close()
broker.poller = new
root.setLevel(old_level)
LOG.debug('replaced %r with %r (new: %d readers, %d writers; '
'old: %d readers, %d writers)', old, new,
len(new.readers), len(new.writers),
len(old.readers), len(old.writers))
def upgrade_router(econtext): def upgrade_router(econtext):
if not isinstance(econtext.router, Router): # TODO if not isinstance(econtext.router, Router): # TODO
econtext.broker.defer(_upgrade_broker, econtext.broker)
econtext.router.__class__ = Router # TODO econtext.router.__class__ = Router # TODO
econtext.router.upgrade( econtext.router.upgrade(
importer=econtext.importer, importer=econtext.importer,
@ -500,17 +533,7 @@ class Argv(object):
return ' '.join(map(self.escape, self.argv)) return ' '.join(map(self.escape, self.argv))
class Poller(mitogen.core.Poller): class KqueuePoller(mitogen.core.Poller):
@classmethod
def from_existing(cls, poller):
self = cls()
for fd, data in poller.readers:
self.start_receive(fd, data)
for fd, data in poller.writers:
self.start_transmit(fd, data)
class KqueuePoller(Poller):
_repr = 'KqueuePoller()' _repr = 'KqueuePoller()'
def __init__(self): def __init__(self):
@ -577,11 +600,11 @@ class KqueuePoller(Poller):
yield self._wfds[fd] yield self._wfds[fd]
class EpollPoller(Poller): class EpollPoller(mitogen.core.Poller):
_repr = 'EpollPoller()' _repr = 'EpollPoller()'
def __init__(self): def __init__(self):
self._epoll = select.epoll() self._epoll = select.epoll(32)
self._registered_fds = set() self._registered_fds = set()
self._rfds = {} self._rfds = {}
self._wfds = {} self._wfds = {}
@ -641,7 +664,7 @@ class EpollPoller(Poller):
if timeout is not None: if timeout is not None:
the_timeout = timeout the_timeout = timeout
events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout) events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout, 32)
for fd, event in events: for fd, event in events:
if event & self._inmask and fd in self._rfds: if event & self._inmask and fd in self._rfds:
# Events can still be read for an already-discarded fd. # Events can still be read for an already-discarded fd.