issue #249: EpollPoller() for Linux.
This commit is contained in:
parent
bc7be1879d
commit
7320c542df
|
@ -542,7 +542,7 @@ class KqueuePoller(Poller):
|
|||
mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream)
|
||||
side = stream.receive_side
|
||||
if side.fd in self._reader_by_fd:
|
||||
del self._reader_by_fd[stream.receive_side.fd]
|
||||
del self._reader_by_fd[side.fd]
|
||||
self._control(side, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
|
||||
|
||||
def start_transmit(self, stream):
|
||||
|
@ -577,11 +577,90 @@ class KqueuePoller(Poller):
|
|||
self._call(broker, side.stream, side.stream.on_transmit)
|
||||
|
||||
|
||||
class EpollPoller(Poller):
|
||||
_repr = 'EpollPoller()'
|
||||
|
||||
def __init__(self):
|
||||
self._epoll = select.epoll()
|
||||
self._registered_fds = set()
|
||||
self._reader_by_fd = {}
|
||||
self._writer_by_fd = {}
|
||||
|
||||
@property
|
||||
def readers(self):
|
||||
return list(self._reader_by_fd.values())
|
||||
|
||||
@property
|
||||
def writers(self):
|
||||
return list(self._writer_by_fd.values())
|
||||
|
||||
def _control(self, fd):
|
||||
mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd)
|
||||
mask = (((fd in self._reader_by_fd) and select.EPOLLIN) |
|
||||
((fd in self._writer_by_fd) and select.EPOLLOUT))
|
||||
if mask:
|
||||
if fd in self._registered_fds:
|
||||
self._epoll.modify(fd, mask)
|
||||
else:
|
||||
self._epoll.register(fd, mask)
|
||||
self._registered_fds.add(fd)
|
||||
elif fd in self._registered_fds:
|
||||
self._epoll.unregister(fd)
|
||||
self._registered_fds.remove(fd)
|
||||
|
||||
def start_receive(self, stream):
|
||||
mitogen.core._vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
|
||||
side = stream.receive_side
|
||||
assert side and side.fd is not None
|
||||
if side.fd not in self._reader_by_fd:
|
||||
self._reader_by_fd[side.fd] = side
|
||||
self._control(side.fd)
|
||||
|
||||
def stop_receive(self, stream):
|
||||
mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream)
|
||||
side = stream.receive_side
|
||||
if side.fd in self._reader_by_fd:
|
||||
del self._reader_by_fd[side.fd]
|
||||
self._control(side.fd)
|
||||
|
||||
def start_transmit(self, stream):
|
||||
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, stream)
|
||||
side = stream.transmit_side
|
||||
assert side and side.fd is not None
|
||||
if side.fd not in self._writer_by_fd:
|
||||
self._writer_by_fd[side.fd] = side
|
||||
self._control(side.fd)
|
||||
|
||||
def stop_transmit(self, stream):
|
||||
mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, stream)
|
||||
side = stream.transmit_side
|
||||
if side.fd in self._writer_by_fd:
|
||||
del self._writer_by_fd[side.fd]
|
||||
self._control(side.fd)
|
||||
|
||||
def poll(self, broker, timeout=None):
|
||||
the_timeout = -1
|
||||
if timeout is not None:
|
||||
the_timeout = timeout
|
||||
|
||||
for fd, ev in self._epoll.poll(the_timeout):
|
||||
if ev & select.EPOLLIN:
|
||||
side = self._reader_by_fd.get(fd)
|
||||
# Events can still be read for an already-discarded fd.
|
||||
if side:
|
||||
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side)
|
||||
self._call(broker, side.stream, side.stream.on_receive)
|
||||
elif ev & select.EPOLLOUT:
|
||||
side = self._writer_by_fd.get(fd)
|
||||
if side:
|
||||
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side)
|
||||
self._call(broker, side.stream, side.stream.on_transmit)
|
||||
|
||||
|
||||
POLLER_BY_SYSNAME = {
|
||||
'Darwin': KqueuePoller,
|
||||
'FreeBSD': KqueuePoller,
|
||||
#'Linux': EpollPoller,
|
||||
'Linux': EpollPoller,
|
||||
}
|
||||
PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller)
|
||||
|
||||
|
|
Loading…
Reference in New Issue