Merge branch '2.5'

Conflicts:
	Changelog
	kombu/utils/eventio.py
This commit is contained in:
Ask Solem 2013-04-17 15:19:29 +01:00
commit 4146118de3
23 changed files with 217 additions and 115 deletions

View File

@ -20,6 +20,7 @@ C Anthony Risinger <anthony+corvisa.com@xtfx.me>
Christophe Chauvet <christophe.chauvet@gmail.com>
Christopher Grebs <cg@webshox.org>
Clay Gerrard <clay.gerrard@gmail.com>
Dan LaMotte <lamotte85@gmail.com>
Dan McGee <dan@archlinux.org>
Dane Guempel <daneguempel@gmail.com>
David Clymer <david@zettazebra.com>
@ -61,6 +62,7 @@ Petar Radosevic <petar@wunki.org>
Peter Hoffmann <tosh54@gmail.com>
Pierre Riteau <priteau@ci.uchicago.edu>
Rafael Duran Castaneda <rafadurancastaneda@gmail.com>
Rafal Malinowski <malinowski@red-sky.pl>
Ralf Nyren <ralf-github@nyren.net>
Rob Ottaway <robottaway@gmail.com>
Rumyana Neykova <rumi.neykova@gmail.com>

View File

@ -19,6 +19,27 @@
>>> from kombu import enable_insecure_serializers
>>> enable_insecure_serializers()
.. _version-2.5.11:
2.5.11
======
:release-date: TBA
- Adds ``passive`` option to :class:`~kombu.Exchange`.
Contributed by Rafal Malinowski
- librabbitmq: Now raises :exc:`NotImplementedError`
if ``ssl`` option enabled.
The librabbitmq library does not support ssl,
but you can use stunnel or change to the ``pyamqp://`` transport
instead.
Fix contributed by Dan LaMotte.
- eventio: select implementation now removes bad file descriptors.
.. _version-2.5.10:
2.5.10

View File

@ -31,38 +31,38 @@ But you can execute the tests by running the command:
setup(
name='kombu-funtests',
version="DEV",
description="Functional test suite for Kombu",
author="Ask Solem",
author_email="ask@celeryproject.org",
url="http://github.com/celery/kombu",
platforms=["any"],
version='DEV',
description='Functional test suite for Kombu',
author='Ask Solem',
author_email='ask@celeryproject.org',
url='http://github.com/celery/kombu',
platforms=['any'],
packages=[],
data_files=[],
zip_safe=False,
cmdclass={"install": no_install},
test_suite="nose.collector",
cmdclass={'install': no_install},
test_suite='nose.collector',
build_requires=[
"nose",
"nose-cover3",
"unittest2",
"coverage>=3.0",
"simplejson",
"PyYAML",
"msgpack-python",
"pymongo",
"couchdb",
"kazoo",
"beanstalkc",
"kombu-sqlalchemy",
"django",
"django-kombu",
'nose',
'nose-cover3',
'unittest2',
'coverage>=3.0',
'simplejson',
'PyYAML',
'msgpack-python',
'pymongo',
'couchdb',
'kazoo',
'beanstalkc',
'kombu-sqlalchemy',
'django',
'django-kombu',
],
classifiers=[
"Operating System :: OS Independent",
"Programming Language :: Python",
"License :: OSI Approved :: BSD License",
"Intended Audience :: Developers",
'Operating System :: OS Independent',
'Programming Language :: Python',
'License :: OSI Approved :: BSD License',
'Intended Audience :: Developers',
],
long_description="Do not install this package",
long_description='Do not install this package',
)

View File

@ -1,8 +1,6 @@
import os
import sys
print("HELLO")
sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
print(sys.path[0])
sys.path.insert(0, os.getcwd())

View File

@ -6,8 +6,8 @@ from funtests import transport
class test_SQS(transport.TransportCase):
transport = "SQS"
prefix = "sqs"
transport = 'SQS'
prefix = 'sqs'
event_loop_max = 100
message_size_limit = 4192 # SQS max body size / 2.
reliable_purge = False
@ -15,10 +15,10 @@ class test_SQS(transport.TransportCase):
# even in simple cases.
def before_connect(self):
if "AWS_ACCESS_KEY_ID" not in os.environ:
raise SkipTest("Missing envvar AWS_ACCESS_KEY_ID")
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
raise SkipTest("Missing envvar AWS_SECRET_ACCESS_KEY")
if 'AWS_ACCESS_KEY_ID' not in os.environ:
raise SkipTest('Missing envvar AWS_ACCESS_KEY_ID')
if 'AWS_SECRET_ACCESS_KEY' not in os.environ:
raise SkipTest('Missing envvar AWS_SECRET_ACCESS_KEY')
def after_connect(self, connection):
connection.channel().sqs

View File

@ -2,5 +2,5 @@ from funtests import transport
class test_amqplib(transport.TransportCase):
transport = "amqplib"
prefix = "amqplib"
transport = 'amqplib'
prefix = 'amqplib'

View File

@ -2,8 +2,8 @@ from funtests import transport
class test_beanstalk(transport.TransportCase):
transport = "beanstalk"
prefix = "beanstalk"
transport = 'beanstalk'
prefix = 'beanstalk'
event_loop_max = 10
message_size_limit = 47662

View File

@ -2,8 +2,8 @@ from funtests import transport
class test_couchdb(transport.TransportCase):
transport = "couchdb"
prefix = "couchdb"
transport = 'couchdb'
prefix = 'couchdb'
event_loop_max = 100
def after_connect(self, connection):

View File

@ -6,8 +6,8 @@ from funtests import transport
class test_django(transport.TransportCase):
transport = "django"
prefix = "django"
transport = 'django'
prefix = 'django'
event_loop_max = 10
def before_connect(self):
@ -17,16 +17,16 @@ class test_django(transport.TransportCase):
try:
import djkombu # noqa
except ImportError:
raise SkipTest("django-kombu not installed")
raise SkipTest('django-kombu not installed')
from django.conf import settings
if not settings.configured:
settings.configure(DATABASE_ENGINE="sqlite3",
DATABASE_NAME=":memory:",
DATABASES={"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": ":memory:"}},
INSTALLED_APPS=("djkombu", ))
settings.configure(DATABASE_ENGINE='sqlite3',
DATABASE_NAME=':memory:',
DATABASES={'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:'}},
INSTALLED_APPS=('djkombu', ))
from django.core.management import call_command
call_command("syncdb")
call_command('syncdb')
setup_django()

View File

@ -5,8 +5,8 @@ from funtests import transport
class test_mongodb(transport.TransportCase):
transport = "mongodb"
prefix = "mongodb"
transport = 'mongodb'
prefix = 'mongodb'
event_loop_max = 100
def after_connect(self, connection):
@ -14,11 +14,11 @@ class test_mongodb(transport.TransportCase):
self.c = self.connection # shortcut
def test_fanout(self, name="test_mongodb_fanout"):
def test_fanout(self, name='test_mongodb_fanout'):
c = self.connection
self.e = Exchange(name, type="fanout")
self.e = Exchange(name, type='fanout')
self.q = Queue(name, exchange=self.e, routing_key=name)
self.q2 = Queue(name + "2", exchange=self.e, routing_key=name + "2")
self.q2 = Queue(name + '2', exchange=self.e, routing_key=name + '2')
channel = c.default_channel
producer = Producer(channel, self.e)
@ -27,9 +27,9 @@ class test_mongodb(transport.TransportCase):
self.q2(channel).declare()
for i in xrange(10):
producer.publish({"foo": i}, routing_key=name)
producer.publish({'foo': i}, routing_key=name)
for i in xrange(10):
producer.publish({"foo": i}, routing_key=name + "2")
producer.publish({'foo': i}, routing_key=name + '2')
_received1 = []
_received2 = []
@ -55,7 +55,7 @@ class test_mongodb(transport.TransportCase):
# queue.delete
for i in xrange(10):
producer.publish({"foo": i}, routing_key=name)
producer.publish({'foo': i}, routing_key=name)
self.assertTrue(self.q(channel).get())
self.q(channel).delete()
self.q(channel).declare()
@ -63,7 +63,7 @@ class test_mongodb(transport.TransportCase):
# queue.purge
for i in xrange(10):
producer.publish({"foo": i}, routing_key=name + "2")
producer.publish({'foo': i}, routing_key=name + '2')
self.assertTrue(self.q2(channel).get())
self.q2(channel).purge()
self.assertIsNone(self.q2(channel).get())

View File

@ -2,5 +2,5 @@ from funtests import transport
class test_pyamqp(transport.TransportCase):
transport = "pyamqp"
prefix = "pyamqp"
transport = 'pyamqp'
prefix = 'pyamqp'

View File

@ -2,8 +2,8 @@ from funtests import transport
class test_redis(transport.TransportCase):
transport = "redis"
prefix = "redis"
transport = 'redis'
prefix = 'redis'
def after_connect(self, connection):
client = connection.channel().client

View File

@ -4,13 +4,13 @@ from funtests import transport
class test_sqla(transport.TransportCase):
transport = "sqlalchemy"
prefix = "sqlalchemy"
transport = 'sqlalchemy'
prefix = 'sqlalchemy'
event_loop_max = 10
connection_options = {"hostname": "sqlite://"}
connection_options = {'hostname': 'sqlite://'}
def before_connect(self):
try:
import sqlakombu # noqa
except ImportError:
raise SkipTest("kombu-sqlalchemy not installed")
raise SkipTest('kombu-sqlalchemy not installed')

View File

@ -2,8 +2,8 @@ from funtests import transport
class test_zookeeper(transport.TransportCase):
transport = "zookeeper"
prefix = "zookeeper"
transport = 'zookeeper'
prefix = 'zookeeper'
event_loop_max = 100
def after_connect(self, connection):

View File

@ -20,7 +20,7 @@ else:
def say(msg):
sys.stderr.write(unicode(msg) + "\n")
sys.stderr.write(unicode(msg) + '\n')
def consumeN(conn, consumer, n=1, timeout=30):
@ -39,7 +39,7 @@ def consumeN(conn, consumer, n=1, timeout=30):
conn.drain_events(timeout=1)
except socket.timeout:
seconds += 1
msg = "Received %s/%s messages. %s seconds passed." % (
msg = 'Received %s/%s messages. %s seconds passed.' % (
len(messages), n, seconds)
if seconds >= timeout:
raise socket.timeout(msg)
@ -83,7 +83,7 @@ class TransportCase(unittest.TestCase):
self.skip_test_reason = str(exc)
else:
self.do_connect()
self.exchange = Exchange(self.prefix, "direct")
self.exchange = Exchange(self.prefix, 'direct')
self.queue = Queue(self.prefix, self.exchange, self.prefix)
def purge(self, names):
@ -101,9 +101,9 @@ class TransportCase(unittest.TestCase):
def get_connection(self, **options):
if self.userid:
options.setdefault("userid", self.userid)
options.setdefault('userid', self.userid)
if self.password:
options.setdefault("password", self.password)
options.setdefault('password', self.password)
return Connection(transport=self.transport, **options)
def do_connect(self):
@ -112,7 +112,7 @@ class TransportCase(unittest.TestCase):
self.connection.connect()
self.after_connect(self.connection)
except self.connection.connection_errors:
self.skip_test_reason = "%s transport can't connect" % (
self.skip_test_reason = '%s transport cannot connect' % (
self.transport, )
else:
self.connected = True
@ -133,9 +133,9 @@ class TransportCase(unittest.TestCase):
consumer = chan1.Consumer(self.queue)
self.purge_consumer(consumer)
producer = chan1.Producer(self.exchange)
producer.publish({"foo": "bar"}, routing_key=self.prefix)
producer.publish({'foo': 'bar'}, routing_key=self.prefix)
message = consumeN(self.connection, consumer)
self.assertDictEqual(message[0], {"foo": "bar"})
self.assertDictEqual(message[0], {'foo': 'bar'})
chan1.close()
self.purge([self.queue.name])
@ -148,7 +148,7 @@ class TransportCase(unittest.TestCase):
producer = chan1.Producer(self.exchange)
for i in xrange(10):
producer.publish({"foo": "bar"}, routing_key=self.prefix)
producer.publish({'foo': 'bar'}, routing_key=self.prefix)
if self.reliable_purge:
self.assertEqual(consumer.purge(), 10)
self.assertEqual(consumer.purge(), 0)
@ -167,8 +167,8 @@ class TransportCase(unittest.TestCase):
if not self.verify_alive():
return
bytes = min(filter(None, [bytes, self.message_size_limit]))
messages = ["".join(random.choice(charset)
for j in xrange(bytes)) + "--%s" % n
messages = [''.join(random.choice(charset)
for j in xrange(bytes)) + '--%s' % n
for i in xrange(n)]
digests = []
chan1 = self.connection.channel()
@ -176,22 +176,22 @@ class TransportCase(unittest.TestCase):
self.purge_consumer(consumer)
producer = chan1.Producer(self.exchange)
for i, message in enumerate(messages):
producer.publish({"text": message,
"i": i}, routing_key=self.prefix)
producer.publish({'text': message,
'i': i}, routing_key=self.prefix)
digests.append(self._digest(message))
received = [(msg["i"], msg["text"])
received = [(msg['i'], msg['text'])
for msg in consumeN(self.connection, consumer, n)]
self.assertEqual(len(received), n)
ordering = [i for i, _ in received]
if ordering != range(n) and not self.suppress_disorder_warning:
warnings.warn(
"%s did not deliver messages in FIFO order: %r" % (
'%s did not deliver messages in FIFO order: %r' % (
self.transport, ordering))
for i, text in received:
if text != messages[i]:
raise AssertionError("%i: %r is not %r" % (
raise AssertionError('%i: %r is not %r' % (
i, text[-100:], messages[i][-100:]))
self.assertEqual(self._digest(text), digests[i])
@ -199,30 +199,30 @@ class TransportCase(unittest.TestCase):
self.purge([self.queue.name])
def P(self, rest):
return "%s%s%s" % (self.prefix, self.sep, rest)
return '%s%s%s' % (self.prefix, self.sep, rest)
def test_produce__consume_multiple(self):
if not self.verify_alive():
return
chan1 = self.connection.channel()
producer = chan1.Producer(self.exchange)
b1 = Queue(self.P("b1"), self.exchange, "b1")(chan1)
b2 = Queue(self.P("b2"), self.exchange, "b2")(chan1)
b3 = Queue(self.P("b3"), self.exchange, "b3")(chan1)
b1 = Queue(self.P('b1'), self.exchange, 'b1')(chan1)
b2 = Queue(self.P('b2'), self.exchange, 'b2')(chan1)
b3 = Queue(self.P('b3'), self.exchange, 'b3')(chan1)
[q.declare() for q in (b1, b2, b3)]
self.purge([b1.name, b2.name, b3.name])
producer.publish("b1", routing_key="b1")
producer.publish("b2", routing_key="b2")
producer.publish("b3", routing_key="b3")
producer.publish('b1', routing_key='b1')
producer.publish('b2', routing_key='b2')
producer.publish('b3', routing_key='b3')
chan1.close()
chan2 = self.connection.channel()
consumer = chan2.Consumer([b1, b2, b3])
messages = consumeN(self.connection, consumer, 3)
self.assertItemsEqual(messages, ["b1", "b2", "b3"])
self.assertItemsEqual(messages, ['b1', 'b2', 'b3'])
chan2.close()
self.purge([self.P("b1"), self.P("b2"), self.P("b3")])
self.purge([self.P('b1'), self.P('b2'), self.P('b3')])
def test_timeout(self):
if not self.verify_alive():
@ -242,10 +242,10 @@ class TransportCase(unittest.TestCase):
chan1 = self.connection.channel()
producer = chan1.Producer(self.exchange)
chan2 = self.connection.channel()
queue = Queue(self.P("basic_get"), self.exchange, "basic_get")
queue = Queue(self.P('basic_get'), self.exchange, 'basic_get')
queue = queue(chan2)
queue.declare()
producer.publish({"basic.get": "this"}, routing_key="basic_get")
producer.publish({'basic.get': 'this'}, routing_key='basic_get')
chan1.close()
for i in range(self.event_loop_max):
@ -253,7 +253,7 @@ class TransportCase(unittest.TestCase):
if m:
break
time.sleep(0.1)
self.assertEqual(m.payload, {"basic.get": "this"})
self.assertEqual(m.payload, {'basic.get': 'this'})
self.purge([queue.name])
chan2.close()

View File

@ -123,6 +123,7 @@ class Exchange(MaybeChannelBound):
type = 'direct'
durable = True
auto_delete = False
passive = False
delivery_mode = PERSISTENT_DELIVERY_MODE
attrs = (
@ -130,6 +131,7 @@ class Exchange(MaybeChannelBound):
('type', None),
('arguments', None),
('durable', bool),
('passive', bool),
('auto_delete', bool),
('delivery_mode', lambda m: DELIVERY_MODES.get(m) or m),
)
@ -143,7 +145,7 @@ class Exchange(MaybeChannelBound):
def __hash__(self):
return hash('E|%s' % (self.name, ))
def declare(self, nowait=False, passive=False):
def declare(self, nowait=False, passive=None):
"""Declare the exchange.
Creates the exchange on the broker.
@ -152,6 +154,7 @@ class Exchange(MaybeChannelBound):
response will not be waited for. Default is :const:`False`.
"""
passive = self.passive if passive is None else passive
if self.name:
return self.channel.exchange_declare(
exchange=self.name, type=self.type, durable=self.durable,
@ -541,8 +544,8 @@ class Queue(MaybeChannelBound):
Returns the message instance if a message was available,
or :const:`None` otherwise.
:keyword no_ack: If set messages received does not have to
be acknowledged.
:keyword no_ack: If enabled the broker will automatically
ack messages.
This method provides direct access to the messages in a
queue using a synchronous dialogue, designed for
@ -575,8 +578,8 @@ class Queue(MaybeChannelBound):
can use the same consumer tags. If this field is empty
the server will generate a unique tag.
:keyword no_ack: If set messages received does not have to
be acknowledged.
:keyword no_ack: If enabled the broker will automatically ack
messages.
:keyword nowait: Do not wait for a reply.

View File

@ -280,8 +280,12 @@ class Consumer(object):
#: consume from.
queues = None
#: Flag for message acknowledgment disabled/enabled.
#: Enabled by default.
#: Flag for automatic message acknowledgment.
#: If enabled the messages are automatically acknowledged by the
#: broker. This can increase performance but means that you
#: have no control of when the message is removed.
#:
#: Disabled by default.
no_ack = None
#: By default all entities will be declared at instantiation, if you
@ -403,6 +407,12 @@ class Consumer(object):
pass
def add_queue(self, queue):
"""Add a queue to the list of queues to consume from.
This will not start consuming from the queue,
for that you will have to call :meth:`consume` after.
"""
queue = queue(self.channel)
if self.auto_declare:
queue.declare()
@ -410,9 +420,26 @@ class Consumer(object):
return queue
def add_queue_from_dict(self, queue, **options):
"""This method is deprecated.
Instead please use::
consumer.add_queue(Queue.from_dict(d))
"""
return self.add_queue(Queue.from_dict(queue, **options))
def consume(self, no_ack=None):
"""Start consuming messages.
Can be called multiple times, but note that while it
will consume from new queues added since the last call,
it will not cancel consuming from removed queues (
use :meth:`cancel_by_queue`).
:param no_ack: See :attr:`no_ack`.
"""
if self.queues:
no_ack = self.no_ack if no_ack is None else no_ack
@ -445,10 +472,12 @@ class Consumer(object):
self.channel.basic_cancel(tag)
def consuming_from(self, queue):
"""Returns :const:`True` if the consumer is currently
consuming from queue'."""
name = queue
if isinstance(queue, Queue):
name = queue.name
return any(q.name == name for q in self.queues)
return name in self._active_tags
def purge(self):
"""Purge messages from all queues.

View File

@ -1,6 +1,7 @@
from __future__ import absolute_import
import anyjson
import atexit
import os
import sys
@ -16,6 +17,21 @@ except ImportError:
anyjson.force_implementation('simplejson')
def teardown():
# Workaround for multiprocessing bug where logging
# is attempted after global already collected at shutdown.
cancelled = set()
try:
import multiprocessing.util
cancelled.add(multiprocessing.util._exit_function)
except (AttributeError, ImportError):
pass
atexit._exithandlers[:] = [
e for e in atexit._exithandlers if e[0] not in cancelled
]
def find_distribution_modules(name=__name__, file=__file__):
current_dist_depth = len(name.split('.')) - 1
current_dist = os.path.join(os.path.dirname(file),

View File

@ -15,7 +15,6 @@ from .utils import Mock
def get_conn():
return Connection(transport=Transport)
class test_binding(TestCase):
def test_constructor(self):
@ -129,6 +128,10 @@ class test_Exchange(TestCase):
exc = Exchange('foo', 'direct', delivery_mode='transient')
self.assertEqual(exc.delivery_mode, Exchange.TRANSIENT_DELIVERY_MODE)
def test_set_passive_mode(self):
exc = Exchange('foo', 'direct', passive=True)
self.assertTrue(exc.passive)
def test_set_persistent_delivery_mode(self):
exc = Exchange('foo', 'direct', delivery_mode='persistent')
self.assertEqual(exc.delivery_mode, Exchange.PERSISTENT_DELIVERY_MODE)

View File

@ -258,9 +258,13 @@ class test_Consumer(TestCase):
def test_consuming_from(self):
consumer = self.connection.Consumer()
consumer.queues[:] = [Queue('a'), Queue('b')]
consumer.queues[:] = [Queue('a'), Queue('b'), Queue('d')]
consumer._active_tags = {'a': 1, 'b': 2}
self.assertFalse(consumer.consuming_from(Queue('c')))
self.assertFalse(consumer.consuming_from('c'))
self.assertFalse(consumer.consuming_from(Queue('d')))
self.assertFalse(consumer.consuming_from('d'))
self.assertTrue(consumer.consuming_from(Queue('a')))
self.assertTrue(consumer.consuming_from(Queue('b')))
self.assertTrue(consumer.consuming_from('b'))

View File

@ -29,6 +29,10 @@ from . import base
DEFAULT_PORT = 5672
NO_SSL_ERROR = """\
ssl not supported by librabbitmq, please use pyamqp:// or stunnel\
"""
class Message(base.Message):
@ -99,6 +103,8 @@ class Transport(base.Transport):
for name, default_value in items(self.default_connection_params):
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
if conninfo.ssl:
raise NotImplementedError(NO_SSL_ERROR)
conn = self.Connection(host=conninfo.host,
userid=conninfo.userid,
password=conninfo.password,

View File

@ -205,7 +205,7 @@ class MultiChannelPoller(object):
for fd in values(self._chan_to_sock):
try:
self.poller.unregister(fd)
except KeyError:
except (KeyError, ValueError):
pass
self._channels.clear()
self._fd_to_chan.clear()

View File

@ -10,7 +10,7 @@ from __future__ import absolute_import
import errno
import socket
from select import select as _selectf
from select import select as _selectf, error as _selecterr
try:
from select import epoll
@ -53,6 +53,11 @@ READ = POLL_READ = 0x001
WRITE = POLL_WRITE = 0x004
ERR = POLL_ERR = 0x008 | 0x010
try:
SELECT_BAD_FD = set((errno.EBADF, errno.WSAENOTSOCK))
except AttributeError:
SELECT_BAD_FD = set((errno.EBADF,))
class Poller(object):
@ -79,11 +84,9 @@ class _epoll(Poller):
def unregister(self, fd):
try:
self._epoll.unregister(fd)
except socket.error:
except (socket.error, ValueError, KeyError):
pass
except ValueError:
pass
except IOError as exc:
except (IOError, OSError) as exc:
if get_errno(exc) != errno.ENOENT:
raise
@ -191,13 +194,30 @@ class _select(Poller):
if events & READ:
self._rfd.add(fd)
def _remove_bad(self):
for fd in self._rfd | self._wfd | self._efd:
try:
_selectf([fd], [], [], 0)
except _selecterr, exc:
if get_errno(exc) in SELECT_BAD_FD:
self.unregister(fd)
def unregister(self, fd):
self._rfd.discard(fd)
self._wfd.discard(fd)
self._efd.discard(fd)
def _poll(self, timeout):
read, write, error = _selectf(self._rfd, self._wfd, self._efd, timeout)
try:
read, write, error = _selectf(
self._rfd, self._wfd, self._efd, timeout,
)
except _selecterr, exc:
if get_errno(exc) == errno.EINTR:
return
elif get_errno(exc) in SELECT_BAD_FD:
self._remove_bad()
events = {}
for fd in read:
if not isinstance(fd, int):