diff --git a/AUTHORS b/AUTHORS index f71d658e..54fe0a17 100644 --- a/AUTHORS +++ b/AUTHORS @@ -20,6 +20,7 @@ C Anthony Risinger Christophe Chauvet Christopher Grebs Clay Gerrard +Dan LaMotte Dan McGee Dane Guempel David Clymer @@ -61,6 +62,7 @@ Petar Radosevic Peter Hoffmann Pierre Riteau Rafael Duran Castaneda +Rafal Malinowski Ralf Nyren Rob Ottaway Rumyana Neykova diff --git a/Changelog b/Changelog index 3c89d029..51fbc9a9 100644 --- a/Changelog +++ b/Changelog @@ -19,6 +19,27 @@ >>> from kombu import enable_insecure_serializers >>> enable_insecure_serializers() +.. _version-2.5.11: + +2.5.11 +====== +:release-date: TBA + +- Adds ``passive`` option to :class:`~kombu.Exchange`. + + Contributed by Rafal Malinowski + +- librabbitmq: Now raises :exc:`NotImplementedError` + if ``ssl`` option enabled. + + The librabbitmq library does not support ssl, + but you can use stunnel or change to the ``pyamqp://`` transport + instead. + + Fix contributed by Dan LaMotte. + +- eventio: select implementation now removes bad file descriptors. + .. _version-2.5.10: 2.5.10 diff --git a/funtests/setup.py b/funtests/setup.py index a7ad8b4e..9bea0b97 100644 --- a/funtests/setup.py +++ b/funtests/setup.py @@ -31,38 +31,38 @@ But you can execute the tests by running the command: setup( name='kombu-funtests', - version="DEV", - description="Functional test suite for Kombu", - author="Ask Solem", - author_email="ask@celeryproject.org", - url="http://github.com/celery/kombu", - platforms=["any"], + version='DEV', + description='Functional test suite for Kombu', + author='Ask Solem', + author_email='ask@celeryproject.org', + url='http://github.com/celery/kombu', + platforms=['any'], packages=[], data_files=[], zip_safe=False, - cmdclass={"install": no_install}, - test_suite="nose.collector", + cmdclass={'install': no_install}, + test_suite='nose.collector', build_requires=[ - "nose", - "nose-cover3", - "unittest2", - "coverage>=3.0", - "simplejson", - "PyYAML", - "msgpack-python", - "pymongo", - "couchdb", - "kazoo", - "beanstalkc", - "kombu-sqlalchemy", - "django", - "django-kombu", + 'nose', + 'nose-cover3', + 'unittest2', + 'coverage>=3.0', + 'simplejson', + 'PyYAML', + 'msgpack-python', + 'pymongo', + 'couchdb', + 'kazoo', + 'beanstalkc', + 'kombu-sqlalchemy', + 'django', + 'django-kombu', ], classifiers=[ - "Operating System :: OS Independent", - "Programming Language :: Python", - "License :: OSI Approved :: BSD License", - "Intended Audience :: Developers", + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'License :: OSI Approved :: BSD License', + 'Intended Audience :: Developers', ], - long_description="Do not install this package", + long_description='Do not install this package', ) diff --git a/funtests/tests/__init__.py b/funtests/tests/__init__.py index 0300fda8..41cbef67 100644 --- a/funtests/tests/__init__.py +++ b/funtests/tests/__init__.py @@ -1,8 +1,6 @@ import os import sys -print("HELLO") - sys.path.insert(0, os.path.join(os.getcwd(), os.pardir)) print(sys.path[0]) sys.path.insert(0, os.getcwd()) diff --git a/funtests/tests/test_SQS.py b/funtests/tests/test_SQS.py index c063a161..ebcbce2b 100644 --- a/funtests/tests/test_SQS.py +++ b/funtests/tests/test_SQS.py @@ -6,8 +6,8 @@ from funtests import transport class test_SQS(transport.TransportCase): - transport = "SQS" - prefix = "sqs" + transport = 'SQS' + prefix = 'sqs' event_loop_max = 100 message_size_limit = 4192 # SQS max body size / 2. reliable_purge = False @@ -15,10 +15,10 @@ class test_SQS(transport.TransportCase): # even in simple cases. def before_connect(self): - if "AWS_ACCESS_KEY_ID" not in os.environ: - raise SkipTest("Missing envvar AWS_ACCESS_KEY_ID") - if "AWS_SECRET_ACCESS_KEY" not in os.environ: - raise SkipTest("Missing envvar AWS_SECRET_ACCESS_KEY") + if 'AWS_ACCESS_KEY_ID' not in os.environ: + raise SkipTest('Missing envvar AWS_ACCESS_KEY_ID') + if 'AWS_SECRET_ACCESS_KEY' not in os.environ: + raise SkipTest('Missing envvar AWS_SECRET_ACCESS_KEY') def after_connect(self, connection): connection.channel().sqs diff --git a/funtests/tests/test_amqplib.py b/funtests/tests/test_amqplib.py index ec91250d..fc5b0d35 100644 --- a/funtests/tests/test_amqplib.py +++ b/funtests/tests/test_amqplib.py @@ -2,5 +2,5 @@ from funtests import transport class test_amqplib(transport.TransportCase): - transport = "amqplib" - prefix = "amqplib" + transport = 'amqplib' + prefix = 'amqplib' diff --git a/funtests/tests/test_beanstalk.py b/funtests/tests/test_beanstalk.py index 5af87f7f..b0601edd 100644 --- a/funtests/tests/test_beanstalk.py +++ b/funtests/tests/test_beanstalk.py @@ -2,8 +2,8 @@ from funtests import transport class test_beanstalk(transport.TransportCase): - transport = "beanstalk" - prefix = "beanstalk" + transport = 'beanstalk' + prefix = 'beanstalk' event_loop_max = 10 message_size_limit = 47662 diff --git a/funtests/tests/test_couchdb.py b/funtests/tests/test_couchdb.py index a6c5d6df..633e17b6 100644 --- a/funtests/tests/test_couchdb.py +++ b/funtests/tests/test_couchdb.py @@ -2,8 +2,8 @@ from funtests import transport class test_couchdb(transport.TransportCase): - transport = "couchdb" - prefix = "couchdb" + transport = 'couchdb' + prefix = 'couchdb' event_loop_max = 100 def after_connect(self, connection): diff --git a/funtests/tests/test_django.py b/funtests/tests/test_django.py index 86f551f3..7f509ea7 100644 --- a/funtests/tests/test_django.py +++ b/funtests/tests/test_django.py @@ -6,8 +6,8 @@ from funtests import transport class test_django(transport.TransportCase): - transport = "django" - prefix = "django" + transport = 'django' + prefix = 'django' event_loop_max = 10 def before_connect(self): @@ -17,16 +17,16 @@ class test_django(transport.TransportCase): try: import djkombu # noqa except ImportError: - raise SkipTest("django-kombu not installed") + raise SkipTest('django-kombu not installed') from django.conf import settings if not settings.configured: - settings.configure(DATABASE_ENGINE="sqlite3", - DATABASE_NAME=":memory:", - DATABASES={"default": { - "ENGINE": "django.db.backends.sqlite3", - "NAME": ":memory:"}}, - INSTALLED_APPS=("djkombu", )) + settings.configure(DATABASE_ENGINE='sqlite3', + DATABASE_NAME=':memory:', + DATABASES={'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': ':memory:'}}, + INSTALLED_APPS=('djkombu', )) from django.core.management import call_command - call_command("syncdb") + call_command('syncdb') setup_django() diff --git a/funtests/tests/test_mongodb.py b/funtests/tests/test_mongodb.py index c51e3a5a..630c1c59 100644 --- a/funtests/tests/test_mongodb.py +++ b/funtests/tests/test_mongodb.py @@ -5,8 +5,8 @@ from funtests import transport class test_mongodb(transport.TransportCase): - transport = "mongodb" - prefix = "mongodb" + transport = 'mongodb' + prefix = 'mongodb' event_loop_max = 100 def after_connect(self, connection): @@ -14,11 +14,11 @@ class test_mongodb(transport.TransportCase): self.c = self.connection # shortcut - def test_fanout(self, name="test_mongodb_fanout"): + def test_fanout(self, name='test_mongodb_fanout'): c = self.connection - self.e = Exchange(name, type="fanout") + self.e = Exchange(name, type='fanout') self.q = Queue(name, exchange=self.e, routing_key=name) - self.q2 = Queue(name + "2", exchange=self.e, routing_key=name + "2") + self.q2 = Queue(name + '2', exchange=self.e, routing_key=name + '2') channel = c.default_channel producer = Producer(channel, self.e) @@ -27,9 +27,9 @@ class test_mongodb(transport.TransportCase): self.q2(channel).declare() for i in xrange(10): - producer.publish({"foo": i}, routing_key=name) + producer.publish({'foo': i}, routing_key=name) for i in xrange(10): - producer.publish({"foo": i}, routing_key=name + "2") + producer.publish({'foo': i}, routing_key=name + '2') _received1 = [] _received2 = [] @@ -55,7 +55,7 @@ class test_mongodb(transport.TransportCase): # queue.delete for i in xrange(10): - producer.publish({"foo": i}, routing_key=name) + producer.publish({'foo': i}, routing_key=name) self.assertTrue(self.q(channel).get()) self.q(channel).delete() self.q(channel).declare() @@ -63,7 +63,7 @@ class test_mongodb(transport.TransportCase): # queue.purge for i in xrange(10): - producer.publish({"foo": i}, routing_key=name + "2") + producer.publish({'foo': i}, routing_key=name + '2') self.assertTrue(self.q2(channel).get()) self.q2(channel).purge() self.assertIsNone(self.q2(channel).get()) diff --git a/funtests/tests/test_pyamqp.py b/funtests/tests/test_pyamqp.py index 8d493a80..5ca0c2e0 100644 --- a/funtests/tests/test_pyamqp.py +++ b/funtests/tests/test_pyamqp.py @@ -2,5 +2,5 @@ from funtests import transport class test_pyamqp(transport.TransportCase): - transport = "pyamqp" - prefix = "pyamqp" + transport = 'pyamqp' + prefix = 'pyamqp' diff --git a/funtests/tests/test_redis.py b/funtests/tests/test_redis.py index 83aef8b8..50d84e2d 100644 --- a/funtests/tests/test_redis.py +++ b/funtests/tests/test_redis.py @@ -2,8 +2,8 @@ from funtests import transport class test_redis(transport.TransportCase): - transport = "redis" - prefix = "redis" + transport = 'redis' + prefix = 'redis' def after_connect(self, connection): client = connection.channel().client diff --git a/funtests/tests/test_sqla.py b/funtests/tests/test_sqla.py index 2a01a4a2..49d3ea1a 100644 --- a/funtests/tests/test_sqla.py +++ b/funtests/tests/test_sqla.py @@ -4,13 +4,13 @@ from funtests import transport class test_sqla(transport.TransportCase): - transport = "sqlalchemy" - prefix = "sqlalchemy" + transport = 'sqlalchemy' + prefix = 'sqlalchemy' event_loop_max = 10 - connection_options = {"hostname": "sqlite://"} + connection_options = {'hostname': 'sqlite://'} def before_connect(self): try: import sqlakombu # noqa except ImportError: - raise SkipTest("kombu-sqlalchemy not installed") + raise SkipTest('kombu-sqlalchemy not installed') diff --git a/funtests/tests/test_zookeeper.py b/funtests/tests/test_zookeeper.py index 0d1274ef..b30eee38 100644 --- a/funtests/tests/test_zookeeper.py +++ b/funtests/tests/test_zookeeper.py @@ -2,8 +2,8 @@ from funtests import transport class test_zookeeper(transport.TransportCase): - transport = "zookeeper" - prefix = "zookeeper" + transport = 'zookeeper' + prefix = 'zookeeper' event_loop_max = 100 def after_connect(self, connection): diff --git a/funtests/transport.py b/funtests/transport.py index 1b4e65ef..434fb2b7 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -20,7 +20,7 @@ else: def say(msg): - sys.stderr.write(unicode(msg) + "\n") + sys.stderr.write(unicode(msg) + '\n') def consumeN(conn, consumer, n=1, timeout=30): @@ -39,7 +39,7 @@ def consumeN(conn, consumer, n=1, timeout=30): conn.drain_events(timeout=1) except socket.timeout: seconds += 1 - msg = "Received %s/%s messages. %s seconds passed." % ( + msg = 'Received %s/%s messages. %s seconds passed.' % ( len(messages), n, seconds) if seconds >= timeout: raise socket.timeout(msg) @@ -83,7 +83,7 @@ class TransportCase(unittest.TestCase): self.skip_test_reason = str(exc) else: self.do_connect() - self.exchange = Exchange(self.prefix, "direct") + self.exchange = Exchange(self.prefix, 'direct') self.queue = Queue(self.prefix, self.exchange, self.prefix) def purge(self, names): @@ -101,9 +101,9 @@ class TransportCase(unittest.TestCase): def get_connection(self, **options): if self.userid: - options.setdefault("userid", self.userid) + options.setdefault('userid', self.userid) if self.password: - options.setdefault("password", self.password) + options.setdefault('password', self.password) return Connection(transport=self.transport, **options) def do_connect(self): @@ -112,7 +112,7 @@ class TransportCase(unittest.TestCase): self.connection.connect() self.after_connect(self.connection) except self.connection.connection_errors: - self.skip_test_reason = "%s transport can't connect" % ( + self.skip_test_reason = '%s transport cannot connect' % ( self.transport, ) else: self.connected = True @@ -133,9 +133,9 @@ class TransportCase(unittest.TestCase): consumer = chan1.Consumer(self.queue) self.purge_consumer(consumer) producer = chan1.Producer(self.exchange) - producer.publish({"foo": "bar"}, routing_key=self.prefix) + producer.publish({'foo': 'bar'}, routing_key=self.prefix) message = consumeN(self.connection, consumer) - self.assertDictEqual(message[0], {"foo": "bar"}) + self.assertDictEqual(message[0], {'foo': 'bar'}) chan1.close() self.purge([self.queue.name]) @@ -148,7 +148,7 @@ class TransportCase(unittest.TestCase): producer = chan1.Producer(self.exchange) for i in xrange(10): - producer.publish({"foo": "bar"}, routing_key=self.prefix) + producer.publish({'foo': 'bar'}, routing_key=self.prefix) if self.reliable_purge: self.assertEqual(consumer.purge(), 10) self.assertEqual(consumer.purge(), 0) @@ -167,8 +167,8 @@ class TransportCase(unittest.TestCase): if not self.verify_alive(): return bytes = min(filter(None, [bytes, self.message_size_limit])) - messages = ["".join(random.choice(charset) - for j in xrange(bytes)) + "--%s" % n + messages = [''.join(random.choice(charset) + for j in xrange(bytes)) + '--%s' % n for i in xrange(n)] digests = [] chan1 = self.connection.channel() @@ -176,22 +176,22 @@ class TransportCase(unittest.TestCase): self.purge_consumer(consumer) producer = chan1.Producer(self.exchange) for i, message in enumerate(messages): - producer.publish({"text": message, - "i": i}, routing_key=self.prefix) + producer.publish({'text': message, + 'i': i}, routing_key=self.prefix) digests.append(self._digest(message)) - received = [(msg["i"], msg["text"]) + received = [(msg['i'], msg['text']) for msg in consumeN(self.connection, consumer, n)] self.assertEqual(len(received), n) ordering = [i for i, _ in received] if ordering != range(n) and not self.suppress_disorder_warning: warnings.warn( - "%s did not deliver messages in FIFO order: %r" % ( + '%s did not deliver messages in FIFO order: %r' % ( self.transport, ordering)) for i, text in received: if text != messages[i]: - raise AssertionError("%i: %r is not %r" % ( + raise AssertionError('%i: %r is not %r' % ( i, text[-100:], messages[i][-100:])) self.assertEqual(self._digest(text), digests[i]) @@ -199,30 +199,30 @@ class TransportCase(unittest.TestCase): self.purge([self.queue.name]) def P(self, rest): - return "%s%s%s" % (self.prefix, self.sep, rest) + return '%s%s%s' % (self.prefix, self.sep, rest) def test_produce__consume_multiple(self): if not self.verify_alive(): return chan1 = self.connection.channel() producer = chan1.Producer(self.exchange) - b1 = Queue(self.P("b1"), self.exchange, "b1")(chan1) - b2 = Queue(self.P("b2"), self.exchange, "b2")(chan1) - b3 = Queue(self.P("b3"), self.exchange, "b3")(chan1) + b1 = Queue(self.P('b1'), self.exchange, 'b1')(chan1) + b2 = Queue(self.P('b2'), self.exchange, 'b2')(chan1) + b3 = Queue(self.P('b3'), self.exchange, 'b3')(chan1) [q.declare() for q in (b1, b2, b3)] self.purge([b1.name, b2.name, b3.name]) - producer.publish("b1", routing_key="b1") - producer.publish("b2", routing_key="b2") - producer.publish("b3", routing_key="b3") + producer.publish('b1', routing_key='b1') + producer.publish('b2', routing_key='b2') + producer.publish('b3', routing_key='b3') chan1.close() chan2 = self.connection.channel() consumer = chan2.Consumer([b1, b2, b3]) messages = consumeN(self.connection, consumer, 3) - self.assertItemsEqual(messages, ["b1", "b2", "b3"]) + self.assertItemsEqual(messages, ['b1', 'b2', 'b3']) chan2.close() - self.purge([self.P("b1"), self.P("b2"), self.P("b3")]) + self.purge([self.P('b1'), self.P('b2'), self.P('b3')]) def test_timeout(self): if not self.verify_alive(): @@ -242,10 +242,10 @@ class TransportCase(unittest.TestCase): chan1 = self.connection.channel() producer = chan1.Producer(self.exchange) chan2 = self.connection.channel() - queue = Queue(self.P("basic_get"), self.exchange, "basic_get") + queue = Queue(self.P('basic_get'), self.exchange, 'basic_get') queue = queue(chan2) queue.declare() - producer.publish({"basic.get": "this"}, routing_key="basic_get") + producer.publish({'basic.get': 'this'}, routing_key='basic_get') chan1.close() for i in range(self.event_loop_max): @@ -253,7 +253,7 @@ class TransportCase(unittest.TestCase): if m: break time.sleep(0.1) - self.assertEqual(m.payload, {"basic.get": "this"}) + self.assertEqual(m.payload, {'basic.get': 'this'}) self.purge([queue.name]) chan2.close() diff --git a/kombu/entity.py b/kombu/entity.py index 32b199c3..682a2628 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -123,6 +123,7 @@ class Exchange(MaybeChannelBound): type = 'direct' durable = True auto_delete = False + passive = False delivery_mode = PERSISTENT_DELIVERY_MODE attrs = ( @@ -130,6 +131,7 @@ class Exchange(MaybeChannelBound): ('type', None), ('arguments', None), ('durable', bool), + ('passive', bool), ('auto_delete', bool), ('delivery_mode', lambda m: DELIVERY_MODES.get(m) or m), ) @@ -143,7 +145,7 @@ class Exchange(MaybeChannelBound): def __hash__(self): return hash('E|%s' % (self.name, )) - def declare(self, nowait=False, passive=False): + def declare(self, nowait=False, passive=None): """Declare the exchange. Creates the exchange on the broker. @@ -152,6 +154,7 @@ class Exchange(MaybeChannelBound): response will not be waited for. Default is :const:`False`. """ + passive = self.passive if passive is None else passive if self.name: return self.channel.exchange_declare( exchange=self.name, type=self.type, durable=self.durable, @@ -541,8 +544,8 @@ class Queue(MaybeChannelBound): Returns the message instance if a message was available, or :const:`None` otherwise. - :keyword no_ack: If set messages received does not have to - be acknowledged. + :keyword no_ack: If enabled the broker will automatically + ack messages. This method provides direct access to the messages in a queue using a synchronous dialogue, designed for @@ -575,8 +578,8 @@ class Queue(MaybeChannelBound): can use the same consumer tags. If this field is empty the server will generate a unique tag. - :keyword no_ack: If set messages received does not have to - be acknowledged. + :keyword no_ack: If enabled the broker will automatically ack + messages. :keyword nowait: Do not wait for a reply. diff --git a/kombu/messaging.py b/kombu/messaging.py index a235da2e..b7de2396 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -280,8 +280,12 @@ class Consumer(object): #: consume from. queues = None - #: Flag for message acknowledgment disabled/enabled. - #: Enabled by default. + #: Flag for automatic message acknowledgment. + #: If enabled the messages are automatically acknowledged by the + #: broker. This can increase performance but means that you + #: have no control of when the message is removed. + #: + #: Disabled by default. no_ack = None #: By default all entities will be declared at instantiation, if you @@ -403,6 +407,12 @@ class Consumer(object): pass def add_queue(self, queue): + """Add a queue to the list of queues to consume from. + + This will not start consuming from the queue, + for that you will have to call :meth:`consume` after. + + """ queue = queue(self.channel) if self.auto_declare: queue.declare() @@ -410,9 +420,26 @@ class Consumer(object): return queue def add_queue_from_dict(self, queue, **options): + """This method is deprecated. + + Instead please use:: + + consumer.add_queue(Queue.from_dict(d)) + + """ return self.add_queue(Queue.from_dict(queue, **options)) def consume(self, no_ack=None): + """Start consuming messages. + + Can be called multiple times, but note that while it + will consume from new queues added since the last call, + it will not cancel consuming from removed queues ( + use :meth:`cancel_by_queue`). + + :param no_ack: See :attr:`no_ack`. + + """ if self.queues: no_ack = self.no_ack if no_ack is None else no_ack @@ -445,10 +472,12 @@ class Consumer(object): self.channel.basic_cancel(tag) def consuming_from(self, queue): + """Returns :const:`True` if the consumer is currently + consuming from queue'.""" name = queue if isinstance(queue, Queue): name = queue.name - return any(q.name == name for q in self.queues) + return name in self._active_tags def purge(self): """Purge messages from all queues. diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py index ad8a62c3..82bf6bb1 100644 --- a/kombu/tests/__init__.py +++ b/kombu/tests/__init__.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import anyjson +import atexit import os import sys @@ -16,6 +17,21 @@ except ImportError: anyjson.force_implementation('simplejson') +def teardown(): + # Workaround for multiprocessing bug where logging + # is attempted after global already collected at shutdown. + cancelled = set() + try: + import multiprocessing.util + cancelled.add(multiprocessing.util._exit_function) + except (AttributeError, ImportError): + pass + + atexit._exithandlers[:] = [ + e for e in atexit._exithandlers if e[0] not in cancelled + ] + + def find_distribution_modules(name=__name__, file=__file__): current_dist_depth = len(name.split('.')) - 1 current_dist = os.path.join(os.path.dirname(file), diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index efd27e8c..20f4446e 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -15,7 +15,6 @@ from .utils import Mock def get_conn(): return Connection(transport=Transport) - class test_binding(TestCase): def test_constructor(self): @@ -129,6 +128,10 @@ class test_Exchange(TestCase): exc = Exchange('foo', 'direct', delivery_mode='transient') self.assertEqual(exc.delivery_mode, Exchange.TRANSIENT_DELIVERY_MODE) + def test_set_passive_mode(self): + exc = Exchange('foo', 'direct', passive=True) + self.assertTrue(exc.passive) + def test_set_persistent_delivery_mode(self): exc = Exchange('foo', 'direct', delivery_mode='persistent') self.assertEqual(exc.delivery_mode, Exchange.PERSISTENT_DELIVERY_MODE) diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 296b02a2..9069d6fe 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -258,9 +258,13 @@ class test_Consumer(TestCase): def test_consuming_from(self): consumer = self.connection.Consumer() - consumer.queues[:] = [Queue('a'), Queue('b')] + consumer.queues[:] = [Queue('a'), Queue('b'), Queue('d')] + consumer._active_tags = {'a': 1, 'b': 2} + self.assertFalse(consumer.consuming_from(Queue('c'))) self.assertFalse(consumer.consuming_from('c')) + self.assertFalse(consumer.consuming_from(Queue('d'))) + self.assertFalse(consumer.consuming_from('d')) self.assertTrue(consumer.consuming_from(Queue('a'))) self.assertTrue(consumer.consuming_from(Queue('b'))) self.assertTrue(consumer.consuming_from('b')) diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index e3211d65..ee6246c1 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -29,6 +29,10 @@ from . import base DEFAULT_PORT = 5672 +NO_SSL_ERROR = """\ +ssl not supported by librabbitmq, please use pyamqp:// or stunnel\ +""" + class Message(base.Message): @@ -99,6 +103,8 @@ class Transport(base.Transport): for name, default_value in items(self.default_connection_params): if not getattr(conninfo, name, None): setattr(conninfo, name, default_value) + if conninfo.ssl: + raise NotImplementedError(NO_SSL_ERROR) conn = self.Connection(host=conninfo.host, userid=conninfo.userid, password=conninfo.password, diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index a6fed4d9..7c9c0606 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -205,7 +205,7 @@ class MultiChannelPoller(object): for fd in values(self._chan_to_sock): try: self.poller.unregister(fd) - except KeyError: + except (KeyError, ValueError): pass self._channels.clear() self._fd_to_chan.clear() diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index 2bd23c5b..0cebee2f 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -10,7 +10,7 @@ from __future__ import absolute_import import errno import socket -from select import select as _selectf +from select import select as _selectf, error as _selecterr try: from select import epoll @@ -53,6 +53,11 @@ READ = POLL_READ = 0x001 WRITE = POLL_WRITE = 0x004 ERR = POLL_ERR = 0x008 | 0x010 +try: + SELECT_BAD_FD = set((errno.EBADF, errno.WSAENOTSOCK)) +except AttributeError: + SELECT_BAD_FD = set((errno.EBADF,)) + class Poller(object): @@ -79,11 +84,9 @@ class _epoll(Poller): def unregister(self, fd): try: self._epoll.unregister(fd) - except socket.error: + except (socket.error, ValueError, KeyError): pass - except ValueError: - pass - except IOError as exc: + except (IOError, OSError) as exc: if get_errno(exc) != errno.ENOENT: raise @@ -191,13 +194,30 @@ class _select(Poller): if events & READ: self._rfd.add(fd) + def _remove_bad(self): + for fd in self._rfd | self._wfd | self._efd: + try: + _selectf([fd], [], [], 0) + except _selecterr, exc: + if get_errno(exc) in SELECT_BAD_FD: + self.unregister(fd) + def unregister(self, fd): self._rfd.discard(fd) self._wfd.discard(fd) self._efd.discard(fd) def _poll(self, timeout): - read, write, error = _selectf(self._rfd, self._wfd, self._efd, timeout) + try: + read, write, error = _selectf( + self._rfd, self._wfd, self._efd, timeout, + ) + except _selecterr, exc: + if get_errno(exc) == errno.EINTR: + return + elif get_errno(exc) in SELECT_BAD_FD: + self._remove_bad() + events = {} for fd in read: if not isinstance(fd, int):