From b6124f83965e3f3fd2fff63643c92f75202de902 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 14 May 2018 22:50:35 +0000 Subject: [PATCH] issue #249: EpollPoller v2. --- mitogen/parent.py | 99 ++++++++++++++++++++--------------------------- 1 file changed, 42 insertions(+), 57 deletions(-) diff --git a/mitogen/parent.py b/mitogen/parent.py index 89a7cf8b..5c815fe9 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -497,7 +497,6 @@ class Argv(object): return ' '.join(map(self.escape, self.argv)) - class Poller(mitogen.core.Poller): @classmethod def from_existing(cls, poller): @@ -538,7 +537,7 @@ class KqueuePoller(Poller): self._rfds[fd] = data or fd def stop_receive(self, fd): - mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd, data) + mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd) if fd in self._rfds: self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) del self._rfds[fd] @@ -559,15 +558,14 @@ class KqueuePoller(Poller): changelist = self._changelist self._changelist = [] for event in self._kqueue.control(changelist, 32, timeout): - if event.filter == select.KQ_FILTER_READ: - if event.ident in self._rfds: - # Events can still be read for an already-discarded fd. - mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side) - yield self._rfds[event.ident] - elif event.filter == select.KQ_FILTER_WRITE: - if event.ident in self._wfds: - mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side) - yield self._wfds[event.ident] + fd = event.ident + if event.filter == select.KQ_FILTER_READ and fd in self._rfds: + # Events can still be read for an already-discarded fd. + mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) + yield self._rfds[fd] + elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds: + mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) + yield self._wfds[fd] class EpollPoller(Poller): @@ -576,21 +574,21 @@ class EpollPoller(Poller): def __init__(self): self._epoll = select.epoll() self._registered_fds = set() - self._reader_by_fd = {} - self._writer_by_fd = {} + self._rfds = {} + self._wfds = {} @property def readers(self): - return list(self._reader_by_fd.values()) + return list(self._rfds.items()) @property def writers(self): - return list(self._writer_by_fd.values()) + return list(self._wfds.items()) def _control(self, fd): mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd) - mask = (((fd in self._reader_by_fd) and select.EPOLLIN) | - ((fd in self._writer_by_fd) and select.EPOLLOUT)) + mask = (((fd in self._rfds) and select.EPOLLIN) | + ((fd in self._wfds) and select.EPOLLOUT)) if mask: if fd in self._registered_fds: self._epoll.modify(fd, mask) @@ -601,59 +599,47 @@ class EpollPoller(Poller): self._epoll.unregister(fd) self._registered_fds.remove(fd) - def start_receive(self, stream): - mitogen.core._vv and IOLOG.debug('%r.start_receive(%r)', self, stream) - side = stream.receive_side - assert side and side.fd is not None - if side.fd not in self._reader_by_fd: - self._reader_by_fd[side.fd] = side - self._control(side.fd) + def start_receive(self, fd, data=None): + mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)', + self, fd, data) + self._rfds[fd] = data or fd + self._control(fd) - def stop_receive(self, stream): - mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) - side = stream.receive_side - if side.fd in self._reader_by_fd: - del self._reader_by_fd[side.fd] - self._control(side.fd) + def stop_receive(self, fd): + mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd) + self._rfds.pop(fd, None) + self._control(fd) - def start_transmit(self, stream): - mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, stream) - side = stream.transmit_side - assert side and side.fd is not None - if side.fd not in self._writer_by_fd: - self._writer_by_fd[side.fd] = side - self._control(side.fd) + def start_transmit(self, fd, data=None): + mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)', + self, fd, data) + self._wfds[fd] = data or fd + self._control(fd) - def stop_transmit(self, stream): - mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, stream) - side = stream.transmit_side - if side.fd in self._writer_by_fd: - del self._writer_by_fd[side.fd] - self._control(side.fd) + def stop_transmit(self, fd): + mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd) + self._wfds.pop(fd, None) + self._control(fd) def poll(self, broker, timeout=None): the_timeout = -1 if timeout is not None: the_timeout = timeout - for fd, ev in self._epoll.poll(the_timeout): - if ev & select.EPOLLIN: - side = self._reader_by_fd.get(fd) + for fd, event in self._epoll.poll(the_timeout): + if event & select.EPOLLIN and fd in self._rfds: # Events can still be read for an already-discarded fd. - if side: - mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side) - self._call(broker, side.stream, side.stream.on_receive) - elif ev & select.EPOLLOUT: - side = self._writer_by_fd.get(fd) - if side: - mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side) - self._call(broker, side.stream, side.stream.on_transmit) + mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) + yield self._rfds[fd] + elif event & select.EPOLLOUT and fd in self._wfds: + mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) + yield self._wfds[fd] POLLER_BY_SYSNAME = { 'Darwin': KqueuePoller, 'FreeBSD': KqueuePoller, - #'Linux': EpollPoller, + 'Linux': EpollPoller, } PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller) @@ -1030,7 +1016,7 @@ class RouteMonitor(object): def _on_stream_disconnect(self, stream): """ - Respond to disconnection of a local stream by + Respond to disconnection of a local stream by """ LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, stream.routes) @@ -1172,7 +1158,6 @@ class Router(mitogen.core.Router): try: stream.connect() except mitogen.core.TimeoutError: - e = sys.exc_info()[1] raise mitogen.core.StreamError(self.connection_timeout_msg) context.name = stream.name self.route_monitor.notice_stream(stream)