From 9bcd2ec56c3162fc5455effa6ffc178c690ca2f1 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 14 Feb 2019 12:36:51 +0000 Subject: [PATCH] issue #542: return of select poller, new selection logic --- mitogen/core.py | 54 ++++++++++++++++---------------- mitogen/parent.py | 73 ++++++++++++++++++++++++++++++++++++-------- tests/poller_test.py | 15 +++++++-- 3 files changed, 98 insertions(+), 44 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 470b00ca..cfdf996b 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1912,6 +1912,8 @@ class Poller(object): Pollers may only be used by one thread at a time. """ + SUPPORTED = True + # This changed from select() to poll() in Mitogen 0.2.4. Since poll() has # no upper FD limit, it is suitable for use with Latch, which must handle # FDs larger than select's limit during many-host runs. We want this @@ -1928,11 +1930,16 @@ class Poller(object): def __init__(self): self._rfds = {} self._wfds = {} - self._pollobj = select.poll() def __repr__(self): return '%s(%#x)' % (type(self).__name__, id(self)) + def _update(self, fd): + """ + Required by PollPoller subclass. + """ + pass + @property def readers(self): """ @@ -1955,20 +1962,6 @@ class Poller(object): """ pass - _readmask = select.POLLIN | select.POLLHUP - # TODO: no proof we dont need writemask too - - def _update(self, fd): - mask = (((fd in self._rfds) and self._readmask) | - ((fd in self._wfds) and select.POLLOUT)) - if mask: - self._pollobj.register(fd, mask) - else: - try: - self._pollobj.unregister(fd) - except KeyError: - pass - def start_receive(self, fd, data=None): """ Cause :meth:`poll` to yield `data` when `fd` is readable. @@ -2004,22 +1997,27 @@ class Poller(object): self._update(fd) def _poll(self, timeout): + (rfds, wfds, _), _ = io_op(select.select, + self._rfds, + self._wfds, + (), timeout + ) + + for fd in rfds: + _vv and IOLOG.debug('%r: POLLIN for %r', self, fd) + data, gen = self._rfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + + for fd in wfds: + _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) + data, gen = self._wfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + if timeout: timeout *= 1000 - events, _ = io_op(self._pollobj.poll, timeout) - for fd, event in events: - if event & self._readmask: - _vv and IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd) - data, gen = self._rfds.get(fd, (None, None)) - if gen and gen < self._generation: - yield data - if event & select.POLLOUT: - _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) - data, gen = self._wfds.get(fd, (None, None)) - if gen and gen < self._generation: - yield data - def poll(self, timeout=None): """ Block the calling thread until one or more FDs are ready for IO. diff --git a/mitogen/parent.py b/mitogen/parent.py index 7e567aaa..f793f234 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -890,10 +890,58 @@ class CallSpec(object): ) +class PollPoller(mitogen.core.Poller): + """ + Poller based on the POSIX poll(2) interface. Not available on some versions + of OS X, otherwise it is the preferred poller for small FD counts. + """ + SUPPORTED = hasattr(select, 'poll') + _repr = 'PollPoller()' + + def __init__(self): + super(PollPoller, self).__init__() + self._pollobj = select.poll() + + # TODO: no proof we dont need writemask too + _readmask = ( + getattr(select, 'POLLIN', 0) | + getattr(select, 'POLLHUP', 0) + ) + + def _update(self, fd): + mask = (((fd in self._rfds) and self._readmask) | + ((fd in self._wfds) and select.POLLOUT)) + if mask: + self._pollobj.register(fd, mask) + else: + try: + self._pollobj.unregister(fd) + except KeyError: + pass + + def _poll(self, timeout): + if timeout: + timeout *= 1000 + + events, _ = mitogen.core.io_op(self._pollobj.poll, timeout) + for fd, event in events: + if event & self._readmask: + IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd) + data, gen = self._rfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + if event & select.POLLOUT: + IOLOG.debug('%r: POLLOUT for %r', self, fd) + data, gen = self._wfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + + class KqueuePoller(mitogen.core.Poller): """ Poller based on the FreeBSD/Darwin kqueue(2) interface. """ + SUPPORTED = hasattr(select, 'kqueue') _repr = 'KqueuePoller()' def __init__(self): @@ -971,6 +1019,7 @@ class EpollPoller(mitogen.core.Poller): """ Poller based on the Linux epoll(2) interface. """ + SUPPORTED = hasattr(select, 'epoll') _repr = 'EpollPoller()' def __init__(self): @@ -1041,20 +1090,18 @@ class EpollPoller(mitogen.core.Poller): yield data -if sys.version_info < (2, 6): - # 2.4 and 2.5 only had select.select() and select.poll(). - POLLER_BY_SYSNAME = {} -else: - POLLER_BY_SYSNAME = { - 'Darwin': KqueuePoller, - 'FreeBSD': KqueuePoller, - 'Linux': EpollPoller, - } +# 2.4 and 2.5 only had select.select() and select.poll(). +for _klass in mitogen.core.Poller, PollPoller, KqueuePoller, EpollPoller: + if _klass.SUPPORTED: + PREFERRED_POLLER = _klass -PREFERRED_POLLER = POLLER_BY_SYSNAME.get( - os.uname()[0], - mitogen.core.Poller, -) +# For apps that start threads dynamically, it's possible Latch will also get +# very high-numbered wait fds when there are many connections, and so select() +# becomes useless there too. So swap in our favourite poller. +if PollPoller.SUPPORTED: + mitogen.core.Latch.poller_class = PollPoller +else: + mitogen.core.Latch.poller_class = PREFERRED_POLLER class DiagLogStream(mitogen.core.BasicStream): diff --git a/tests/poller_test.py b/tests/poller_test.py index 1d1e0cd0..e2e3cdd7 100644 --- a/tests/poller_test.py +++ b/tests/poller_test.py @@ -401,16 +401,25 @@ class SelectTest(AllMixin, testlib.TestCase): klass = mitogen.core.Poller SelectTest = unittest2.skipIf( - condition=not hasattr(select, 'select'), + condition=(not SelectTest.klass.SUPPORTED), reason='select.select() not supported' )(SelectTest) +class PollTest(AllMixin, testlib.TestCase): + klass = mitogen.parent.PollPoller + +PollTest = unittest2.skipIf( + condition=(not PollTest.klass.SUPPORTED), + reason='select.poll() not supported' +)(PollTest) + + class KqueueTest(AllMixin, testlib.TestCase): klass = mitogen.parent.KqueuePoller KqueueTest = unittest2.skipIf( - condition=not hasattr(select, 'kqueue'), + condition=(not KqueueTest.klass.SUPPORTED), reason='select.kqueue() not supported' )(KqueueTest) @@ -419,7 +428,7 @@ class EpollTest(AllMixin, testlib.TestCase): klass = mitogen.parent.EpollPoller EpollTest = unittest2.skipIf( - condition=not hasattr(select, 'epoll'), + condition=(not EpollTest.klass.SUPPORTED), reason='select.epoll() not supported' )(EpollTest)