diff --git a/README.rst b/README.rst index 83256847..7e238f65 100644 --- a/README.rst +++ b/README.rst @@ -81,31 +81,31 @@ and the `Wikipedia article about AMQP`_. Transport Comparison ==================== -+---------------+----------+------------+------------+---------------+ -| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | -+---------------+----------+------------+------------+---------------+ -| *amqp* | Native | Yes | Yes | Yes | -+---------------+----------+------------+------------+---------------+ -| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | -+---------------+----------+------------+------------+---------------+ -| *mongodb* | Virtual | Yes | Yes | Yes | -+---------------+----------+------------+------------+---------------+ -| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | -+---------------+----------+------------+------------+---------------+ -| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *django* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ ++---------------+----------+------------+------------+---------------+--------------+ +| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** | ++---------------+----------+------------+------------+---------------+--------------+ +| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ | ++---------------+----------+------------+------------+---------------+--------------+ +| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes | ++---------------+----------+------------+------------+---------------+--------------+ +| *mongodb* | Virtual | Yes | Yes | Yes | Yes | ++---------------+----------+------------+------------+---------------+--------------+ +| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes | ++---------------+----------+------------+------------+---------------+--------------+ +| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes | ++---------------+----------+------------+------------+---------------+--------------+ +| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *django* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ .. [#f1] Declarations only kept in memory, so exchanges/queues @@ -115,6 +115,7 @@ Transport Comparison Disabled by default, but can be enabled by using the ``supports_fanout`` transport option. +.. [#f3] AMQP Message priority support depends on broker implementation. Documentation ------------- diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst index f97b4b79..38037a29 100644 --- a/docs/userguide/connections.rst +++ b/docs/userguide/connections.rst @@ -145,29 +145,31 @@ transport URL, or use ``amqp`` to have the fallback. Transport Comparison ==================== -+---------------+----------+------------+------------+---------------+ -| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | -+---------------+----------+------------+------------+---------------+ -| *amqp* | Native | Yes | Yes | Yes | -+---------------+----------+------------+------------+---------------+ -| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | -+---------------+----------+------------+------------+---------------+ -| *mongodb* | Virtual | Yes | Yes | Yes | -+---------------+----------+------------+------------+---------------+ -| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | -+---------------+----------+------------+------------+---------------+ -| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *django* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ -| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | -+---------------+----------+------------+------------+---------------+ ++---------------+----------+------------+------------+---------------+--------------+ +| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** | ++---------------+----------+------------+------------+---------------+--------------+ +| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ | ++---------------+----------+------------+------------+---------------+--------------+ +| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes | ++---------------+----------+------------+------------+---------------+--------------+ +| *mongodb* | Virtual | Yes | Yes | Yes | Yes | ++---------------+----------+------------+------------+---------------+--------------+ +| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes | ++---------------+----------+------------+------------+---------------+--------------+ +| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes | ++---------------+----------+------------+------------+---------------+--------------+ +| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *django* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ +| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No | ++---------------+----------+------------+------------+---------------+--------------+ .. [#f1] Declarations only kept in memory, so exchanges/queues @@ -176,3 +178,5 @@ Transport Comparison .. [#f2] Fanout supported via storing routing tables in SimpleDB. Disabled by default, but can be enabled by using the ``supports_fanout`` transport option. + +.. [#f3] AMQP Message priority support depends on broker implementation. diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index fbe8f064..0627aed5 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -515,6 +515,22 @@ class test_Channel(Case): with self.assertRaises(ChannelError): self.channel.queue_declare(queue='21wisdjwqe', passive=True) + def test_get_message_priority(self): + def _message(priority): + return self.channel.prepare_message('the message with priority', + priority=priority) + + self.assertEqual(self.channel._get_message_priority(_message(5)), + 5) + self.assertEqual(self.channel._get_message_priority(_message(self.channel.min_priority - 10)), + self.channel.min_priority) + self.assertEqual(self.channel._get_message_priority(_message(self.channel.max_priority + 10)), + self.channel.max_priority) + self.assertEqual(self.channel._get_message_priority(_message('foobar')), + self.channel.default_priority) + self.assertEqual(self.channel._get_message_priority(_message(2), reverse=True), + self.channel.max_priority - 2) + class test_Transport(Case): diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py index 9dff8b49..544fd438 100644 --- a/kombu/transport/beanstalk.py +++ b/kombu/transport/beanstalk.py @@ -44,7 +44,7 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): extra = {} - priority = message['properties']['delivery_info']['priority'] + priority = self._get_message_priority(message) ttr = message['properties'].get('ttr') if ttr is not None: extra['ttr'] = ttr diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 78af0f9f..5f726d09 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -99,7 +99,8 @@ class Channel(virtual.Channel): else: msg = self.get_messages().find_and_modify( query={'queue': queue}, - sort={'_id': pymongo.ASCENDING}, + sort=[('priority', pymongo.ASCENDING), + ('_id', pymongo.ASCENDING)], remove=True, ) @@ -116,7 +117,9 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): self.get_messages().insert({'payload': dumps(message), - 'queue': queue}) + 'queue': queue, + 'priority': self._get_message_priority(message, + reverse=True)}) def _purge(self, queue): size = self._size(queue) @@ -202,7 +205,7 @@ class Channel(virtual.Channel): def _ensure_indexes(self): '''Ensure indexes on collections.''' self.get_messages().ensure_index( - [('queue', 1), ('_id', 1)], background=True, + [('queue', 1), ('priority', 1), ('_id', 1)], background=True, ) self.get_broadcast().ensure_index([('queue', 1)]) self.get_routing().ensure_index([('queue', 1), ('exchange', 1)]) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index b93fa7af..6609292e 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -661,11 +661,8 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): """Deliver message.""" - try: - pri = max(min(int( - message['properties']['delivery_info']['priority']), 9), 0) - except (TypeError, ValueError, KeyError): - pri = 0 + pri = self._get_message_priority(message) + with self.conn_or_acquire() as client: client.lpush(self._q_for_pri(queue, pri), dumps(message)) diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 5a05386e..21bbbe23 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -371,6 +371,11 @@ class Channel(AbstractChannel, base.StdChannel): # List of options to transfer from :attr:`transport_options`. from_transport_options = ('body_encoding', 'deadletter_queue') + # Priority defaults + default_priority = 0 + min_priority = 0 + max_priority = 9 + def __init__(self, connection, **kwargs): self.connection = connection self._consumers = set() @@ -657,7 +662,7 @@ class Channel(AbstractChannel, base.StdChannel): """Prepare message data.""" properties = properties or {} info = properties.setdefault('delivery_info', {}) - info['priority'] = priority or 0 + info['priority'] = priority or self.default_priority return {'body': body, 'content-encoding': content_encoding, @@ -727,6 +732,19 @@ class Channel(AbstractChannel, base.StdChannel): self._reset_cycle() return self._cycle + def _get_message_priority(self, message, reverse=False): + """Gets priority from message and converts it to the bounds: [0, 9]. + Higher value has more priority. + """ + try: + priority = max(min(int(message['properties']['delivery_info']['priority']), + self.max_priority), + self.min_priority) + except (TypeError, ValueError, KeyError): + priority = self.default_priority + + return (self.max_priority - priority) if reverse else priority + class Management(base.Management): diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py index 2d1c8abc..6645507a 100644 --- a/kombu/transport/zookeeper.py +++ b/kombu/transport/zookeeper.py @@ -37,7 +37,6 @@ from kombu.utils.encoding import bytes_to_str from . import virtual -MAX_PRIORITY = 9 try: import kazoo @@ -103,13 +102,8 @@ class Channel(virtual.Channel): return queue def _put(self, queue, message, **kwargs): - try: - priority = message['properties']['delivery_info']['priority'] - except KeyError: - priority = 0 - queue = self._get_queue(queue) - queue.put(dumps(message), priority=(MAX_PRIORITY - priority)) + queue.put(dumps(message), priority=self._get_message_priority(message, reverse=True)) def _get(self, queue): queue = self._get_queue(queue)