This commit is contained in:
Ask Solem 2014-06-02 17:44:46 +01:00
parent dfed47af3d
commit bb6d70eeda
10 changed files with 35 additions and 37 deletions

View File

@ -272,38 +272,36 @@ class Hub(object):
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
#print('[[[HUB]]]: %s' % (self.repr_active(), ))
if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
#print('[EVENTS]: %s' % (self.nepr_events(events or []), ))
except ValueError: # Issue 882
raise StopIteration()
for fileno, event in events or ():
for fd, event in events or ():
if fileno in consolidate and \
writers.get(fileno) is None:
to_consolidate.append(fileno)
to_consolidate.append(fd)
continue
cb = cbargs = None
if event & READ:
try:
cb, cbargs = readers[fileno]
cb, cbargs = readers[fd]
except KeyError:
self.remove_reader(fileno)
self.remove_reader(fd)
continue
elif event & WRITE:
try:
cb, cbargs = writers[fileno]
cb, cbargs = writers[fd]
except KeyError:
self.remove_writer(fileno)
self.remove_writer(fd)
continue
elif event & ERR:
try:
cb, cbargs = (readers.get(fileno) or
writers.get(fileno))
cb, cbargs = (readers.get(fd) or
writers.get(fd))
except TypeError:
pass
@ -315,11 +313,11 @@ class Hub(object):
except OSError as exc:
if get_errno(exc) != errno.EBADF:
raise
hub_remove(fileno)
hub_remove(fd)
except StopIteration:
pass
except Exception:
hub_remove(fileno)
hub_remove(fd)
raise
else:
try:

View File

@ -10,7 +10,7 @@
"""
from __future__ import absolute_import
############## py3k #########################################################
# ############# py3k #########################################################
import sys
PY3 = sys.version_info[0] == 3
@ -34,7 +34,7 @@ try:
except NameError: # pragma: no cover
bytes_t = str # noqa
############## time.monotonic ################################################
# ############# time.monotonic ###############################################
if sys.version_info < (3, 3):
@ -89,7 +89,7 @@ try:
except ImportError:
monotonic = _monotonic # noqa
############## Py3 <-> Py2 ###################################################
# ############# Py3 <-> Py2 ##################################################
if PY3: # pragma: no cover
import builtins

View File

@ -210,14 +210,14 @@ class Mailbox(object):
def get_reply_queue(self):
oid = self.oid
return Queue('%s.%s' % (oid, self.reply_exchange.name),
exchange=self.reply_exchange,
routing_key=oid,
durable=False,
auto_delete=True,
queue_arguments={
'x-expires': int(REPLY_QUEUE_EXPIRES * 1000),
})
return Queue(
'%s.%s' % (oid, self.reply_exchange.name),
exchange=self.reply_exchange,
routing_key=oid,
durable=False,
auto_delete=True,
queue_arguments={'x-expires': int(REPLY_QUEUE_EXPIRES * 1000)},
)
@cached_property
def reply_queue(self):

View File

@ -21,7 +21,7 @@ def select_blocking_method(type):
def _detect_environment():
## -eventlet-
# ## -eventlet-
if 'eventlet' in sys.modules:
try:
from eventlet.patcher import is_monkey_patched as is_eventlet
@ -32,7 +32,7 @@ def _detect_environment():
except ImportError:
pass
# -gevent-
# ## -gevent-
if 'gevent' in sys.modules:
try:
from gevent import socket as _gsocket

View File

@ -90,7 +90,6 @@ class test_ConsumerMixin(Case):
def test_Consumer_context(self):
c, Acons, Bcons = self._context()
_connref = _chanref = None
with c.Consumer() as (conn, channel, consumer):
self.assertIs(conn, c.connection)
@ -104,7 +103,6 @@ class test_ConsumerMixin(Case):
self.assertIs(subcons.channel, conn.default_channel)
Acons.__enter__.assert_called_with()
Bcons.__enter__.assert_called_with()
_connref, _chanref = conn, channel
c.on_consume_end.assert_called_with(conn, channel)

View File

@ -38,9 +38,12 @@ class test_syn(Case):
def test_detect_environment_gevent(self):
with patch('gevent.socket', create=True) as m:
prev, socket.socket = socket.socket, m.socket
self.assertTrue(sys.modules['gevent'])
env = syn._detect_environment()
self.assertEqual(env, 'gevent')
try:
self.assertTrue(sys.modules['gevent'])
env = syn._detect_environment()
self.assertEqual(env, 'gevent')
finally:
socket.socket = prev
def test_detect_environment_no_eventlet_or_gevent(self):
try:

View File

@ -267,8 +267,8 @@ class test_Channel(Case):
c.exchange_declare(n)
c.queue_declare(n)
c.queue_bind(n, n, n)
c.queue_bind(n, n, n) # tests code path that returns
# if queue already bound.
# tests code path that returns if queue already bound.
c.queue_bind(n, n, n)
c.queue_delete(n, if_empty=True)
self.assertIn(n, c.state.bindings)

View File

@ -35,7 +35,6 @@ class Channel(virtual.Channel):
super(Channel, self).basic_consume(queue, *args, **kwargs)
def _get(self, queue):
#self.refresh_connection()
m = Queue.objects.fetch(queue)
if m:
return loads(bytes_to_str(m))

View File

@ -207,7 +207,7 @@ class Channel(virtual.Channel):
self.get_broadcast().ensure_index([('queue', 1)])
self.get_routing().ensure_index([('queue', 1), ('exchange', 1)])
#TODO Store a more complete exchange metatable in the routing collection
# TODO Store a more complete exchange metatable in the routing collection
def get_table(self, exchange):
"""Get table of bindings for ``exchange``."""
localRoutes = frozenset(self.state.exchanges[exchange]['table'])

View File

@ -8,7 +8,7 @@ Helps compatibility with older Python versions.
from __future__ import absolute_import
############## timedelta_seconds() -> delta.total_seconds ####################
# ############# timedelta_seconds() -> delta.total_seconds ###################
from datetime import timedelta
HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, 'total_seconds')
@ -36,7 +36,7 @@ else: # pragma: no cover
return 0
return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5)
############## socket.error.errno ############################################
# ############# socket.error.errno ###########################################
def get_errno(exc):
@ -53,7 +53,7 @@ def get_errno(exc):
pass
return 0
############## collections.OrderedDict #######################################
# ############# collections.OrderedDict ######################################
try:
from collections import OrderedDict
except ImportError: