Merge branch 'daevaorn/priority'

This commit is contained in:
Ask Solem 2014-05-20 19:30:05 +01:00
commit bd6c5bed2b
8 changed files with 98 additions and 65 deletions

View File

@ -81,31 +81,31 @@ and the `Wikipedia article about AMQP`_.
Transport Comparison
====================
+---------------+----------+------------+------------+---------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
+---------------+----------+------------+------------+---------------+
| *amqp* | Native | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
+---------------+----------+------------+------------+---------------+
| *mongodb* | Virtual | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
+---------------+----------+------------+------------+---------------+--------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** |
+---------------+----------+------------+------------+---------------+--------------+
| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ |
+---------------+----------+------------+------------+---------------+--------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *mongodb* | Virtual | Yes | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No |
+---------------+----------+------------+------------+---------------+--------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
@ -115,6 +115,7 @@ Transport Comparison
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
.. [#f3] AMQP Message priority support depends on broker implementation.
Documentation
-------------

View File

@ -145,29 +145,31 @@ transport URL, or use ``amqp`` to have the fallback.
Transport Comparison
====================
+---------------+----------+------------+------------+---------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
+---------------+----------+------------+------------+---------------+
| *amqp* | Native | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
+---------------+----------+------------+------------+---------------+
| *mongodb* | Virtual | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
+---------------+----------+------------+------------+---------------+--------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** |
+---------------+----------+------------+------------+---------------+--------------+
| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ |
+---------------+----------+------------+------------+---------------+--------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *mongodb* | Virtual | Yes | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No |
+---------------+----------+------------+------------+---------------+--------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
@ -176,3 +178,5 @@ Transport Comparison
.. [#f2] Fanout supported via storing routing tables in SimpleDB.
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
.. [#f3] AMQP Message priority support depends on broker implementation.

View File

@ -515,6 +515,22 @@ class test_Channel(Case):
with self.assertRaises(ChannelError):
self.channel.queue_declare(queue='21wisdjwqe', passive=True)
def test_get_message_priority(self):
def _message(priority):
return self.channel.prepare_message('the message with priority',
priority=priority)
self.assertEqual(self.channel._get_message_priority(_message(5)),
5)
self.assertEqual(self.channel._get_message_priority(_message(self.channel.min_priority - 10)),
self.channel.min_priority)
self.assertEqual(self.channel._get_message_priority(_message(self.channel.max_priority + 10)),
self.channel.max_priority)
self.assertEqual(self.channel._get_message_priority(_message('foobar')),
self.channel.default_priority)
self.assertEqual(self.channel._get_message_priority(_message(2), reverse=True),
self.channel.max_priority - 2)
class test_Transport(Case):

View File

@ -44,7 +44,7 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
extra = {}
priority = message['properties']['delivery_info']['priority']
priority = self._get_message_priority(message)
ttr = message['properties'].get('ttr')
if ttr is not None:
extra['ttr'] = ttr

View File

@ -99,7 +99,8 @@ class Channel(virtual.Channel):
else:
msg = self.get_messages().find_and_modify(
query={'queue': queue},
sort={'_id': pymongo.ASCENDING},
sort=[('priority', pymongo.ASCENDING),
('_id', pymongo.ASCENDING)],
remove=True,
)
@ -116,7 +117,9 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
self.get_messages().insert({'payload': dumps(message),
'queue': queue})
'queue': queue,
'priority': self._get_message_priority(message,
reverse=True)})
def _purge(self, queue):
size = self._size(queue)
@ -202,7 +205,7 @@ class Channel(virtual.Channel):
def _ensure_indexes(self):
'''Ensure indexes on collections.'''
self.get_messages().ensure_index(
[('queue', 1), ('_id', 1)], background=True,
[('queue', 1), ('priority', 1), ('_id', 1)], background=True,
)
self.get_broadcast().ensure_index([('queue', 1)])
self.get_routing().ensure_index([('queue', 1), ('exchange', 1)])

View File

@ -661,11 +661,8 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
"""Deliver message."""
try:
pri = max(min(int(
message['properties']['delivery_info']['priority']), 9), 0)
except (TypeError, ValueError, KeyError):
pri = 0
pri = self._get_message_priority(message)
with self.conn_or_acquire() as client:
client.lpush(self._q_for_pri(queue, pri), dumps(message))

View File

@ -371,6 +371,11 @@ class Channel(AbstractChannel, base.StdChannel):
# List of options to transfer from :attr:`transport_options`.
from_transport_options = ('body_encoding', 'deadletter_queue')
# Priority defaults
default_priority = 0
min_priority = 0
max_priority = 9
def __init__(self, connection, **kwargs):
self.connection = connection
self._consumers = set()
@ -657,7 +662,7 @@ class Channel(AbstractChannel, base.StdChannel):
"""Prepare message data."""
properties = properties or {}
info = properties.setdefault('delivery_info', {})
info['priority'] = priority or 0
info['priority'] = priority or self.default_priority
return {'body': body,
'content-encoding': content_encoding,
@ -727,6 +732,19 @@ class Channel(AbstractChannel, base.StdChannel):
self._reset_cycle()
return self._cycle
def _get_message_priority(self, message, reverse=False):
"""Gets priority from message and converts it to the bounds: [0, 9].
Higher value has more priority.
"""
try:
priority = max(min(int(message['properties']['delivery_info']['priority']),
self.max_priority),
self.min_priority)
except (TypeError, ValueError, KeyError):
priority = self.default_priority
return (self.max_priority - priority) if reverse else priority
class Management(base.Management):

View File

@ -37,7 +37,6 @@ from kombu.utils.encoding import bytes_to_str
from . import virtual
MAX_PRIORITY = 9
try:
import kazoo
@ -103,13 +102,8 @@ class Channel(virtual.Channel):
return queue
def _put(self, queue, message, **kwargs):
try:
priority = message['properties']['delivery_info']['priority']
except KeyError:
priority = 0
queue = self._get_queue(queue)
queue.put(dumps(message), priority=(MAX_PRIORITY - priority))
queue.put(dumps(message), priority=self._get_message_priority(message, reverse=True))
def _get(self, queue):
queue = self._get_queue(queue)