diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 306fdd31..c9b6e1ff 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -272,38 +272,36 @@ class Hub(object): item() poll_timeout = fire_timers(propagate=propagate) if scheduled else 1 - #print('[[[HUB]]]: %s' % (self.repr_active(), )) if readers or writers: to_consolidate = [] try: events = poll(poll_timeout) - #print('[EVENTS]: %s' % (self.nepr_events(events or []), )) except ValueError: # Issue 882 raise StopIteration() - for fileno, event in events or (): + for fd, event in events or (): if fileno in consolidate and \ writers.get(fileno) is None: - to_consolidate.append(fileno) + to_consolidate.append(fd) continue cb = cbargs = None if event & READ: try: - cb, cbargs = readers[fileno] + cb, cbargs = readers[fd] except KeyError: - self.remove_reader(fileno) + self.remove_reader(fd) continue elif event & WRITE: try: - cb, cbargs = writers[fileno] + cb, cbargs = writers[fd] except KeyError: - self.remove_writer(fileno) + self.remove_writer(fd) continue elif event & ERR: try: - cb, cbargs = (readers.get(fileno) or - writers.get(fileno)) + cb, cbargs = (readers.get(fd) or + writers.get(fd)) except TypeError: pass @@ -315,11 +313,11 @@ class Hub(object): except OSError as exc: if get_errno(exc) != errno.EBADF: raise - hub_remove(fileno) + hub_remove(fd) except StopIteration: pass except Exception: - hub_remove(fileno) + hub_remove(fd) raise else: try: diff --git a/kombu/five.py b/kombu/five.py index 33c77fe5..87abc09c 100644 --- a/kombu/five.py +++ b/kombu/five.py @@ -10,7 +10,7 @@ """ from __future__ import absolute_import -############## py3k ######################################################### +# ############# py3k ######################################################### import sys PY3 = sys.version_info[0] == 3 @@ -34,7 +34,7 @@ try: except NameError: # pragma: no cover bytes_t = str # noqa -############## time.monotonic ################################################ +# ############# time.monotonic ############################################### if sys.version_info < (3, 3): @@ -89,7 +89,7 @@ try: except ImportError: monotonic = _monotonic # noqa -############## Py3 <-> Py2 ################################################### +# ############# Py3 <-> Py2 ################################################## if PY3: # pragma: no cover import builtins diff --git a/kombu/pidbox.py b/kombu/pidbox.py index 5c70a382..3a313115 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -210,14 +210,14 @@ class Mailbox(object): def get_reply_queue(self): oid = self.oid - return Queue('%s.%s' % (oid, self.reply_exchange.name), - exchange=self.reply_exchange, - routing_key=oid, - durable=False, - auto_delete=True, - queue_arguments={ - 'x-expires': int(REPLY_QUEUE_EXPIRES * 1000), - }) + return Queue( + '%s.%s' % (oid, self.reply_exchange.name), + exchange=self.reply_exchange, + routing_key=oid, + durable=False, + auto_delete=True, + queue_arguments={'x-expires': int(REPLY_QUEUE_EXPIRES * 1000)}, + ) @cached_property def reply_queue(self): diff --git a/kombu/syn.py b/kombu/syn.py index 7f6e8099..01b4d479 100644 --- a/kombu/syn.py +++ b/kombu/syn.py @@ -21,7 +21,7 @@ def select_blocking_method(type): def _detect_environment(): - ## -eventlet- + # ## -eventlet- if 'eventlet' in sys.modules: try: from eventlet.patcher import is_monkey_patched as is_eventlet @@ -32,7 +32,7 @@ def _detect_environment(): except ImportError: pass - # -gevent- + # ## -gevent- if 'gevent' in sys.modules: try: from gevent import socket as _gsocket diff --git a/kombu/tests/test_mixins.py b/kombu/tests/test_mixins.py index b80f0131..53c423df 100644 --- a/kombu/tests/test_mixins.py +++ b/kombu/tests/test_mixins.py @@ -90,7 +90,6 @@ class test_ConsumerMixin(Case): def test_Consumer_context(self): c, Acons, Bcons = self._context() - _connref = _chanref = None with c.Consumer() as (conn, channel, consumer): self.assertIs(conn, c.connection) @@ -104,7 +103,6 @@ class test_ConsumerMixin(Case): self.assertIs(subcons.channel, conn.default_channel) Acons.__enter__.assert_called_with() Bcons.__enter__.assert_called_with() - _connref, _chanref = conn, channel c.on_consume_end.assert_called_with(conn, channel) diff --git a/kombu/tests/test_syn.py b/kombu/tests/test_syn.py index 551e5544..34e58035 100644 --- a/kombu/tests/test_syn.py +++ b/kombu/tests/test_syn.py @@ -38,9 +38,12 @@ class test_syn(Case): def test_detect_environment_gevent(self): with patch('gevent.socket', create=True) as m: prev, socket.socket = socket.socket, m.socket - self.assertTrue(sys.modules['gevent']) - env = syn._detect_environment() - self.assertEqual(env, 'gevent') + try: + self.assertTrue(sys.modules['gevent']) + env = syn._detect_environment() + self.assertEqual(env, 'gevent') + finally: + socket.socket = prev def test_detect_environment_no_eventlet_or_gevent(self): try: diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index d249c4e7..f872356f 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -267,8 +267,8 @@ class test_Channel(Case): c.exchange_declare(n) c.queue_declare(n) c.queue_bind(n, n, n) - c.queue_bind(n, n, n) # tests code path that returns - # if queue already bound. + # tests code path that returns if queue already bound. + c.queue_bind(n, n, n) c.queue_delete(n, if_empty=True) self.assertIn(n, c.state.bindings) diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py index 67bfa576..e859f3fd 100644 --- a/kombu/transport/django/__init__.py +++ b/kombu/transport/django/__init__.py @@ -35,7 +35,6 @@ class Channel(virtual.Channel): super(Channel, self).basic_consume(queue, *args, **kwargs) def _get(self, queue): - #self.refresh_connection() m = Queue.objects.fetch(queue) if m: return loads(bytes_to_str(m)) diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 78af0f9f..fd048188 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -207,7 +207,7 @@ class Channel(virtual.Channel): self.get_broadcast().ensure_index([('queue', 1)]) self.get_routing().ensure_index([('queue', 1), ('exchange', 1)]) - #TODO Store a more complete exchange metatable in the routing collection + # TODO Store a more complete exchange metatable in the routing collection def get_table(self, exchange): """Get table of bindings for ``exchange``.""" localRoutes = frozenset(self.state.exchanges[exchange]['table']) diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py index 16028997..d0c3e676 100644 --- a/kombu/utils/compat.py +++ b/kombu/utils/compat.py @@ -8,7 +8,7 @@ Helps compatibility with older Python versions. from __future__ import absolute_import -############## timedelta_seconds() -> delta.total_seconds #################### +# ############# timedelta_seconds() -> delta.total_seconds ################### from datetime import timedelta HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, 'total_seconds') @@ -36,7 +36,7 @@ else: # pragma: no cover return 0 return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5) -############## socket.error.errno ############################################ +# ############# socket.error.errno ########################################### def get_errno(exc): @@ -53,7 +53,7 @@ def get_errno(exc): pass return 0 -############## collections.OrderedDict ####################################### +# ############# collections.OrderedDict ###################################### try: from collections import OrderedDict except ImportError: