diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index e9acc923..8325ae7b 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -10,7 +10,7 @@ from __future__ import absolute_import import errno import socket -from select import select as _selectf +from select import select as _selectf, error as _selecterr try: from select import epoll @@ -53,6 +53,11 @@ READ = POLL_READ = 0x001 WRITE = POLL_WRITE = 0x004 ERR = POLL_ERR = 0x008 | 0x010 +try: + SELECT_BAD_FD = set((errno.EBADF, errno.WSAENOTSOCK)) +except AttributeError: + SELECT_BAD_FD = set((errno.EBADF,)) + class Poller(object): @@ -79,11 +84,9 @@ class _epoll(Poller): def unregister(self, fd): try: self._epoll.unregister(fd) - except socket.error: + except (socket.error, ValueError, KeyError): pass - except ValueError: - pass - except IOError, exc: + except (IOError, OSError), exc: if get_errno(exc) != errno.ENOENT: raise @@ -191,13 +194,30 @@ class _select(Poller): if events & READ: self._rfd.add(fd) + def _remove_bad(self): + for fd in self._rfd | self._wfd | self._efd: + try: + _selectf([fd], [], [], 0) + except _selecterr, exc: + if get_errno(exc) in SELECT_BAD_FD: + self.unregister(fd) + def unregister(self, fd): self._rfd.discard(fd) self._wfd.discard(fd) self._efd.discard(fd) def _poll(self, timeout): - read, write, error = _selectf(self._rfd, self._wfd, self._efd, timeout) + try: + read, write, error = _selectf( + self._rfd, self._wfd, self._efd, timeout, + ) + except _selecterr, exc: + if get_errno(exc) == errno.EINTR: + return + elif get_errno(exc) in SELECT_BAD_FD: + self._remove_bad() + events = {} for fd in read: if not isinstance(fd, int):