Adds Queue.expires,.message_ttl,.max_length,.max_length_bytes,.max_priority (Issue #629)

This commit is contained in:
Ask Solem 2016-10-14 16:53:03 -07:00
parent 42c8b17d49
commit 3dec40a007
8 changed files with 199 additions and 11 deletions

View File

@ -42,7 +42,7 @@ class Object(object):
def f(obj, type):
if recurse and isinstance(obj, Object):
return obj.as_dict(recurse=True)
return type(obj) if type else obj
return type(obj) if type and obj is not None else obj
return {
attr: f(getattr(self, attr), type) for attr, type in self.attrs
}

View File

@ -380,8 +380,13 @@ class Queue(MaybeChannelBound):
queue_arguments (Dict): See :attr:`queue_arguments`.
binding_arguments (Dict): See :attr:`binding_arguments`.
consumer_arguments (Dict): See :attr:`consumer_arguments`.
no_declare (bool): See :attr:`no_declare`
on_declared (Callable): See :attr:`on_declared`
no_declare (bool): See :attr:`no_declare`.
on_declared (Callable): See :attr:`on_declared`.
expires (float): See :attr:`expires`.
message_ttl (float): See :attr:`message_ttl`.
max_length (int): See :attr:`max_length`.
max_length_bytes (int): See :attr:`max_length_bytes`.
max_priority (int): See :attr:`max_priority`.
Attributes:
name (str): Name of the queue.
@ -437,6 +442,63 @@ class Queue(MaybeChannelBound):
there was no consumer ever on the queue, it won't be
deleted.
expires (float): Set the expiry time (in seconds) for when this
queue should expire.
The expiry time decides how long the queue can stay unused
before it's automatically deleted.
*Unused* means the queue has no consumers, the queue has not been
redeclared, and ``Queue.get`` has not been invoked for a duration
of at least the expiration period.
See https://www.rabbitmq.com/ttl.html#queue-ttl
**RabbitMQ extension**: Only available when using RabbitMQ.
message_ttl (float): Message time to live in seconds.
This setting controls how long messages can stay in the queue
unconsumed. If the expiry time passes before a message consumer
has received the message, the message is deleted and no consumer
will see the message.
See https://www.rabbitmq.com/ttl.html#per-queue-message-ttl
**RabbitMQ extension**: Only available when using RabbitMQ.
max_length (int): Set the maximum number of messages that the
queue can hold.
If the number of messages in the queue size exceeds this limit,
new messages will be dropped (or dead-lettered if a dead letter
exchange is active).
See https://www.rabbitmq.com/maxlength.html
**RabbitMQ extension**: Only available when using RabbitMQ.
max_length_bytes (int): Set the max size (in bytes) for the total
of messages in the queue.
If the total size of all the messages in the queue exceeds this
limit, new messages will be dropped (or dead-lettered if a dead
letter exchange is active).
**RabbitMQ extension**: Only available when using RabbitMQ.
max_priority (int): Set the highest priority number for this queue.
For example if the value is 10, then messages can delivered to
this queue can have a ``priority`` value between 0 and 10,
where 10 is the highest priority.
RabbitMQ queues without a max priority set will ignore
the priority field in the message, so if you want priorities
you need to set the max priority field to declare the queue
as a priority queue.
**RabbitMQ extension**: Only available when using RabbitMQ.
queue_arguments (Dict): Additional arguments used when declaring
the queue. Can be used to to set the arguments value
for RabbitMQ/AMQP's ``queue.declare``.
@ -488,6 +550,11 @@ class Queue(MaybeChannelBound):
('alias', None),
('bindings', list),
('no_declare', bool),
('expires', float),
('message_ttl', float),
('max_length', int),
('max_length_bytes', int),
('max_priority', int)
)
def __init__(self, name='', exchange=None, routing_key='',
@ -556,13 +623,24 @@ class Queue(MaybeChannelBound):
The client can use this to check whether a queue exists
without modifying the server state.
"""
ret = self.channel.queue_declare(queue=self.name,
passive=passive,
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete,
arguments=self.queue_arguments,
nowait=nowait)
queue_arguments = self.channel.prepare_queue_arguments(
self.queue_arguments or {},
expires=self.expires,
message_ttl=self.message_ttl,
max_length=self.max_length,
max_length_bytes=self.max_length_bytes,
max_priority=self.max_priority,
)
print('QUEUE ARGUMENTS: %r' % (queue_arguments,))
ret = self.channel.queue_declare(
queue=self.name,
passive=passive,
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete,
arguments=queue_arguments,
nowait=nowait,
)
if not self.name:
self.name = ret[0]
if self.on_declared:

View File

@ -7,11 +7,62 @@ import socket
from amqp.exceptions import RecoverableConnectionError
from kombu.exceptions import ChannelError, ConnectionError
from kombu.five import items
from kombu.message import Message
from kombu.utils.functional import dictfilter
from kombu.utils.objects import cached_property
from kombu.utils.time import maybe_s_to_ms
__all__ = ['Message', 'StdChannel', 'Management', 'Transport']
RABBITMQ_QUEUE_ARGUMENTS = { # type: Mapping[str, Tuple[str, Callable]]
'expires': ('x-expires', maybe_s_to_ms),
'message_ttl': ('x-message-ttl', maybe_s_to_ms),
'max_length': ('x-max-length', int),
'max_length_bytes': ('x-max-length-bytes', int),
'max_priority': ('x-max-priority', int),
}
def to_rabbitmq_queue_arguments(arguments, **options):
# type: (Mapping, **Any) -> Dict
"""Convert queue arguments to RabbitMQ queue arguments.
This is the implementation for Channel.prepare_queue_arguments
for AMQP-based transports. It's used by both the pyamqp and librabbitmq
transports.
Arguments:
arguments (Mapping):
User-supplied arguments (``Queue.queue_arguments``).
Keyword Arguments:
expires (float): Queue expiry time in seconds.
This will be converted to ``x-expires`` in int milliseconds.
message_ttl (float): Message TTL in seconds.
This will be converted to ``x-message-ttl`` in int milliseconds.
max_length (int): Max queue length (in number of messages).
This will be converted to ``x-max-length`` int.
max_length_bytes (int): Max queue size in bytes.
This will be converted to ``x-max-length-bytes`` int.
max_priority (int): Max priority steps for queue.
This will be converted to ``x-max-priority`` int.
Returns:
Dict: RabbitMQ compatible queue arguments.
"""
prepared = dictfilter(dict(
_to_rabbitmq_queue_argument(key, value)
for key, value in items(options)
))
return dict(arguments, **prepared) if prepared else arguments
def _to_rabbitmq_queue_argument(key, value):
# type: (str, Any) -> Tuple[str, Any]
opt, typ = RABBITMQ_QUEUE_ARGUMENTS[key]
return opt, typ(value) if value is not None else value
def _LeftBlank(obj, method):
return NotImplementedError(
@ -44,6 +95,9 @@ class StdChannel(object):
"""
pass
def prepare_queue_arguments(self, arguments, **kwargs):
return arguments
def __enter__(self):
return self

View File

@ -16,6 +16,7 @@ from kombu.utils.amq_manager import get_manager
from kombu.utils.text import version_string_as_tuple
from . import base
from .base import to_rabbitmq_queue_arguments
W_VERSION = """
librabbitmq version too old to detect RabbitMQ version information
@ -60,6 +61,9 @@ class Channel(amqp.Channel, base.StdChannel):
'priority': priority})
return body, properties
def prepare_queue_arguments(self, arguments, **kwargs):
return to_rabbitmq_queue_arguments(arguments, **kwargs)
class Connection(amqp.Connection):
"""AMQP Connection (librabbitmq)."""

View File

@ -8,6 +8,7 @@ from kombu.utils.amq_manager import get_manager
from kombu.utils.text import version_string_as_tuple
from . import base
from .base import to_rabbitmq_queue_arguments
DEFAULT_PORT = 5672
DEFAULT_SSL_PORT = 5671
@ -48,6 +49,9 @@ class Channel(amqp.Channel, base.StdChannel):
**properties or {}
)
def prepare_queue_arguments(self, arguments, **kwargs):
return to_rabbitmq_queue_arguments(arguments, **kwargs)
def message_to_python(self, raw_message):
"""Convert encoded message body back to a Python value."""
return self.Message(self, raw_message)

10
kombu/utils/time.py Normal file
View File

@ -0,0 +1,10 @@
"""Time Utilities."""
from __future__ import absolute_import, unicode_literals
__all__ = ['maybe_s_to_ms']
def maybe_s_to_ms(v):
# type: (Optional[Union[int, float]]) -> int
"""Convert seconds to milliseconds, but return None for None."""
return int(float(v) * 1000.0) if v is not None else v

View File

@ -7,7 +7,22 @@ from case import Mock
from kombu import Connection, Consumer, Exchange, Producer, Queue
from kombu.five import text_t
from kombu.message import Message
from kombu.transport.base import StdChannel, Transport, Management
from kombu.transport.base import (
StdChannel, Transport, Management, to_rabbitmq_queue_arguments,
)
@pytest.mark.parametrize('args,input,expected', [
({}, {'message_ttl': 20}, {'x-message-ttl': 20000}),
({}, {'message_ttl': None}, {}),
({'foo': 'bar'}, {'expires': 30.3}, {'x-expires': 30300, 'foo': 'bar'}),
({'x-expires': 3}, {'expires': 4}, {'x-expires': 4000}),
({}, {'max_length': 10}, {'x-max-length': 10}),
({}, {'max_length_bytes': 1033}, {'x-max-length-bytes': 1033}),
({}, {'max_priority': 303}, {'x-max-priority': 303}),
])
def test_rabbitmq_queue_arguments(args, input, expected):
assert to_rabbitmq_queue_arguments(args, **input) == expected
class test_StdChannel:

23
t/unit/utils/test_time.py Normal file
View File

@ -0,0 +1,23 @@
from __future__ import absolute_import, unicode_literals
import pytest
from kombu.utils.time import maybe_s_to_ms
@pytest.mark.parametrize('input,expected', [
(3, 3000),
(3.0, 3000),
(303, 303000),
(303.33, 303330),
(303.333, 303333),
(303.3334, 303333),
(None, None),
(0, 0),
])
def test_maybe_s_to_ms(input, expected):
ret = maybe_s_to_ms(input)
if expected is None:
assert ret is None
else:
assert ret == expected