issue #249: EpollPoller v2.

This commit is contained in:
David Wilson 2018-05-14 22:50:35 +00:00
parent 9abcf63155
commit b6124f8396
1 changed files with 42 additions and 57 deletions

View File

@ -497,7 +497,6 @@ class Argv(object):
return ' '.join(map(self.escape, self.argv)) return ' '.join(map(self.escape, self.argv))
class Poller(mitogen.core.Poller): class Poller(mitogen.core.Poller):
@classmethod @classmethod
def from_existing(cls, poller): def from_existing(cls, poller):
@ -538,7 +537,7 @@ class KqueuePoller(Poller):
self._rfds[fd] = data or fd self._rfds[fd] = data or fd
def stop_receive(self, fd): def stop_receive(self, fd):
mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd, data) mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd)
if fd in self._rfds: if fd in self._rfds:
self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
del self._rfds[fd] del self._rfds[fd]
@ -559,15 +558,14 @@ class KqueuePoller(Poller):
changelist = self._changelist changelist = self._changelist
self._changelist = [] self._changelist = []
for event in self._kqueue.control(changelist, 32, timeout): for event in self._kqueue.control(changelist, 32, timeout):
if event.filter == select.KQ_FILTER_READ: fd = event.ident
if event.ident in self._rfds: if event.filter == select.KQ_FILTER_READ and fd in self._rfds:
# Events can still be read for an already-discarded fd. # Events can still be read for an already-discarded fd.
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side) mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
yield self._rfds[event.ident] yield self._rfds[fd]
elif event.filter == select.KQ_FILTER_WRITE: elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds:
if event.ident in self._wfds: mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side) yield self._wfds[fd]
yield self._wfds[event.ident]
class EpollPoller(Poller): class EpollPoller(Poller):
@ -576,21 +574,21 @@ class EpollPoller(Poller):
def __init__(self): def __init__(self):
self._epoll = select.epoll() self._epoll = select.epoll()
self._registered_fds = set() self._registered_fds = set()
self._reader_by_fd = {} self._rfds = {}
self._writer_by_fd = {} self._wfds = {}
@property @property
def readers(self): def readers(self):
return list(self._reader_by_fd.values()) return list(self._rfds.items())
@property @property
def writers(self): def writers(self):
return list(self._writer_by_fd.values()) return list(self._wfds.items())
def _control(self, fd): def _control(self, fd):
mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd) mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd)
mask = (((fd in self._reader_by_fd) and select.EPOLLIN) | mask = (((fd in self._rfds) and select.EPOLLIN) |
((fd in self._writer_by_fd) and select.EPOLLOUT)) ((fd in self._wfds) and select.EPOLLOUT))
if mask: if mask:
if fd in self._registered_fds: if fd in self._registered_fds:
self._epoll.modify(fd, mask) self._epoll.modify(fd, mask)
@ -601,59 +599,47 @@ class EpollPoller(Poller):
self._epoll.unregister(fd) self._epoll.unregister(fd)
self._registered_fds.remove(fd) self._registered_fds.remove(fd)
def start_receive(self, stream): def start_receive(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_receive(%r)', self, stream) mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)',
side = stream.receive_side self, fd, data)
assert side and side.fd is not None self._rfds[fd] = data or fd
if side.fd not in self._reader_by_fd: self._control(fd)
self._reader_by_fd[side.fd] = side
self._control(side.fd)
def stop_receive(self, stream): def stop_receive(self, fd):
mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd)
side = stream.receive_side self._rfds.pop(fd, None)
if side.fd in self._reader_by_fd: self._control(fd)
del self._reader_by_fd[side.fd]
self._control(side.fd)
def start_transmit(self, stream): def start_transmit(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, stream) mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)',
side = stream.transmit_side self, fd, data)
assert side and side.fd is not None self._wfds[fd] = data or fd
if side.fd not in self._writer_by_fd: self._control(fd)
self._writer_by_fd[side.fd] = side
self._control(side.fd)
def stop_transmit(self, stream): def stop_transmit(self, fd):
mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, stream) mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd)
side = stream.transmit_side self._wfds.pop(fd, None)
if side.fd in self._writer_by_fd: self._control(fd)
del self._writer_by_fd[side.fd]
self._control(side.fd)
def poll(self, broker, timeout=None): def poll(self, broker, timeout=None):
the_timeout = -1 the_timeout = -1
if timeout is not None: if timeout is not None:
the_timeout = timeout the_timeout = timeout
for fd, ev in self._epoll.poll(the_timeout): for fd, event in self._epoll.poll(the_timeout):
if ev & select.EPOLLIN: if event & select.EPOLLIN and fd in self._rfds:
side = self._reader_by_fd.get(fd)
# Events can still be read for an already-discarded fd. # Events can still be read for an already-discarded fd.
if side: mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side) yield self._rfds[fd]
self._call(broker, side.stream, side.stream.on_receive) elif event & select.EPOLLOUT and fd in self._wfds:
elif ev & select.EPOLLOUT: mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
side = self._writer_by_fd.get(fd) yield self._wfds[fd]
if side:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side)
self._call(broker, side.stream, side.stream.on_transmit)
POLLER_BY_SYSNAME = { POLLER_BY_SYSNAME = {
'Darwin': KqueuePoller, 'Darwin': KqueuePoller,
'FreeBSD': KqueuePoller, 'FreeBSD': KqueuePoller,
#'Linux': EpollPoller, 'Linux': EpollPoller,
} }
PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller) PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller)
@ -1172,7 +1158,6 @@ class Router(mitogen.core.Router):
try: try:
stream.connect() stream.connect()
except mitogen.core.TimeoutError: except mitogen.core.TimeoutError:
e = sys.exc_info()[1]
raise mitogen.core.StreamError(self.connection_timeout_msg) raise mitogen.core.StreamError(self.connection_timeout_msg)
context.name = stream.name context.name = stream.name
self.route_monitor.notice_stream(stream) self.route_monitor.notice_stream(stream)