diff --git a/kombu/abstract.py b/kombu/abstract.py index b8c80c0b..39516970 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -42,7 +42,7 @@ class Object(object): def f(obj, type): if recurse and isinstance(obj, Object): return obj.as_dict(recurse=True) - return type(obj) if type else obj + return type(obj) if type and obj is not None else obj return { attr: f(getattr(self, attr), type) for attr, type in self.attrs } diff --git a/kombu/entity.py b/kombu/entity.py index f84c2fe9..e418e808 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -380,8 +380,13 @@ class Queue(MaybeChannelBound): queue_arguments (Dict): See :attr:`queue_arguments`. binding_arguments (Dict): See :attr:`binding_arguments`. consumer_arguments (Dict): See :attr:`consumer_arguments`. - no_declare (bool): See :attr:`no_declare` - on_declared (Callable): See :attr:`on_declared` + no_declare (bool): See :attr:`no_declare`. + on_declared (Callable): See :attr:`on_declared`. + expires (float): See :attr:`expires`. + message_ttl (float): See :attr:`message_ttl`. + max_length (int): See :attr:`max_length`. + max_length_bytes (int): See :attr:`max_length_bytes`. + max_priority (int): See :attr:`max_priority`. Attributes: name (str): Name of the queue. @@ -437,6 +442,63 @@ class Queue(MaybeChannelBound): there was no consumer ever on the queue, it won't be deleted. + expires (float): Set the expiry time (in seconds) for when this + queue should expire. + + The expiry time decides how long the queue can stay unused + before it's automatically deleted. + *Unused* means the queue has no consumers, the queue has not been + redeclared, and ``Queue.get`` has not been invoked for a duration + of at least the expiration period. + + See https://www.rabbitmq.com/ttl.html#queue-ttl + + **RabbitMQ extension**: Only available when using RabbitMQ. + + message_ttl (float): Message time to live in seconds. + + This setting controls how long messages can stay in the queue + unconsumed. If the expiry time passes before a message consumer + has received the message, the message is deleted and no consumer + will see the message. + + See https://www.rabbitmq.com/ttl.html#per-queue-message-ttl + + **RabbitMQ extension**: Only available when using RabbitMQ. + + max_length (int): Set the maximum number of messages that the + queue can hold. + + If the number of messages in the queue size exceeds this limit, + new messages will be dropped (or dead-lettered if a dead letter + exchange is active). + + See https://www.rabbitmq.com/maxlength.html + + **RabbitMQ extension**: Only available when using RabbitMQ. + + max_length_bytes (int): Set the max size (in bytes) for the total + of messages in the queue. + + If the total size of all the messages in the queue exceeds this + limit, new messages will be dropped (or dead-lettered if a dead + letter exchange is active). + + **RabbitMQ extension**: Only available when using RabbitMQ. + + max_priority (int): Set the highest priority number for this queue. + + For example if the value is 10, then messages can delivered to + this queue can have a ``priority`` value between 0 and 10, + where 10 is the highest priority. + + RabbitMQ queues without a max priority set will ignore + the priority field in the message, so if you want priorities + you need to set the max priority field to declare the queue + as a priority queue. + + **RabbitMQ extension**: Only available when using RabbitMQ. + queue_arguments (Dict): Additional arguments used when declaring the queue. Can be used to to set the arguments value for RabbitMQ/AMQP's ``queue.declare``. @@ -488,6 +550,11 @@ class Queue(MaybeChannelBound): ('alias', None), ('bindings', list), ('no_declare', bool), + ('expires', float), + ('message_ttl', float), + ('max_length', int), + ('max_length_bytes', int), + ('max_priority', int) ) def __init__(self, name='', exchange=None, routing_key='', @@ -556,13 +623,24 @@ class Queue(MaybeChannelBound): The client can use this to check whether a queue exists without modifying the server state. """ - ret = self.channel.queue_declare(queue=self.name, - passive=passive, - durable=self.durable, - exclusive=self.exclusive, - auto_delete=self.auto_delete, - arguments=self.queue_arguments, - nowait=nowait) + queue_arguments = self.channel.prepare_queue_arguments( + self.queue_arguments or {}, + expires=self.expires, + message_ttl=self.message_ttl, + max_length=self.max_length, + max_length_bytes=self.max_length_bytes, + max_priority=self.max_priority, + ) + print('QUEUE ARGUMENTS: %r' % (queue_arguments,)) + ret = self.channel.queue_declare( + queue=self.name, + passive=passive, + durable=self.durable, + exclusive=self.exclusive, + auto_delete=self.auto_delete, + arguments=queue_arguments, + nowait=nowait, + ) if not self.name: self.name = ret[0] if self.on_declared: diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 6d381b83..0f30bde0 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -7,11 +7,62 @@ import socket from amqp.exceptions import RecoverableConnectionError from kombu.exceptions import ChannelError, ConnectionError +from kombu.five import items from kombu.message import Message +from kombu.utils.functional import dictfilter from kombu.utils.objects import cached_property +from kombu.utils.time import maybe_s_to_ms __all__ = ['Message', 'StdChannel', 'Management', 'Transport'] +RABBITMQ_QUEUE_ARGUMENTS = { # type: Mapping[str, Tuple[str, Callable]] + 'expires': ('x-expires', maybe_s_to_ms), + 'message_ttl': ('x-message-ttl', maybe_s_to_ms), + 'max_length': ('x-max-length', int), + 'max_length_bytes': ('x-max-length-bytes', int), + 'max_priority': ('x-max-priority', int), +} + + +def to_rabbitmq_queue_arguments(arguments, **options): + # type: (Mapping, **Any) -> Dict + """Convert queue arguments to RabbitMQ queue arguments. + + This is the implementation for Channel.prepare_queue_arguments + for AMQP-based transports. It's used by both the pyamqp and librabbitmq + transports. + + Arguments: + arguments (Mapping): + User-supplied arguments (``Queue.queue_arguments``). + + Keyword Arguments: + expires (float): Queue expiry time in seconds. + This will be converted to ``x-expires`` in int milliseconds. + message_ttl (float): Message TTL in seconds. + This will be converted to ``x-message-ttl`` in int milliseconds. + max_length (int): Max queue length (in number of messages). + This will be converted to ``x-max-length`` int. + max_length_bytes (int): Max queue size in bytes. + This will be converted to ``x-max-length-bytes`` int. + max_priority (int): Max priority steps for queue. + This will be converted to ``x-max-priority`` int. + + Returns: + Dict: RabbitMQ compatible queue arguments. + """ + prepared = dictfilter(dict( + _to_rabbitmq_queue_argument(key, value) + for key, value in items(options) + )) + return dict(arguments, **prepared) if prepared else arguments + + +def _to_rabbitmq_queue_argument(key, value): + # type: (str, Any) -> Tuple[str, Any] + opt, typ = RABBITMQ_QUEUE_ARGUMENTS[key] + return opt, typ(value) if value is not None else value + def _LeftBlank(obj, method): return NotImplementedError( @@ -44,6 +95,9 @@ class StdChannel(object): """ pass + def prepare_queue_arguments(self, arguments, **kwargs): + return arguments + def __enter__(self): return self diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index 93de2ed0..d6a58bfc 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -16,6 +16,7 @@ from kombu.utils.amq_manager import get_manager from kombu.utils.text import version_string_as_tuple from . import base +from .base import to_rabbitmq_queue_arguments W_VERSION = """ librabbitmq version too old to detect RabbitMQ version information @@ -60,6 +61,9 @@ class Channel(amqp.Channel, base.StdChannel): 'priority': priority}) return body, properties + def prepare_queue_arguments(self, arguments, **kwargs): + return to_rabbitmq_queue_arguments(arguments, **kwargs) + class Connection(amqp.Connection): """AMQP Connection (librabbitmq).""" diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py index bc424653..bb78f85b 100644 --- a/kombu/transport/pyamqp.py +++ b/kombu/transport/pyamqp.py @@ -8,6 +8,7 @@ from kombu.utils.amq_manager import get_manager from kombu.utils.text import version_string_as_tuple from . import base +from .base import to_rabbitmq_queue_arguments DEFAULT_PORT = 5672 DEFAULT_SSL_PORT = 5671 @@ -48,6 +49,9 @@ class Channel(amqp.Channel, base.StdChannel): **properties or {} ) + def prepare_queue_arguments(self, arguments, **kwargs): + return to_rabbitmq_queue_arguments(arguments, **kwargs) + def message_to_python(self, raw_message): """Convert encoded message body back to a Python value.""" return self.Message(self, raw_message) diff --git a/kombu/utils/time.py b/kombu/utils/time.py new file mode 100644 index 00000000..64d31d3b --- /dev/null +++ b/kombu/utils/time.py @@ -0,0 +1,10 @@ +"""Time Utilities.""" +from __future__ import absolute_import, unicode_literals + +__all__ = ['maybe_s_to_ms'] + + +def maybe_s_to_ms(v): + # type: (Optional[Union[int, float]]) -> int + """Convert seconds to milliseconds, but return None for None.""" + return int(float(v) * 1000.0) if v is not None else v diff --git a/t/unit/transport/test_base.py b/t/unit/transport/test_base.py index bf1d551c..786eb1ee 100644 --- a/t/unit/transport/test_base.py +++ b/t/unit/transport/test_base.py @@ -7,7 +7,22 @@ from case import Mock from kombu import Connection, Consumer, Exchange, Producer, Queue from kombu.five import text_t from kombu.message import Message -from kombu.transport.base import StdChannel, Transport, Management +from kombu.transport.base import ( + StdChannel, Transport, Management, to_rabbitmq_queue_arguments, +) + + +@pytest.mark.parametrize('args,input,expected', [ + ({}, {'message_ttl': 20}, {'x-message-ttl': 20000}), + ({}, {'message_ttl': None}, {}), + ({'foo': 'bar'}, {'expires': 30.3}, {'x-expires': 30300, 'foo': 'bar'}), + ({'x-expires': 3}, {'expires': 4}, {'x-expires': 4000}), + ({}, {'max_length': 10}, {'x-max-length': 10}), + ({}, {'max_length_bytes': 1033}, {'x-max-length-bytes': 1033}), + ({}, {'max_priority': 303}, {'x-max-priority': 303}), +]) +def test_rabbitmq_queue_arguments(args, input, expected): + assert to_rabbitmq_queue_arguments(args, **input) == expected class test_StdChannel: diff --git a/t/unit/utils/test_time.py b/t/unit/utils/test_time.py new file mode 100644 index 00000000..4f6ddc0b --- /dev/null +++ b/t/unit/utils/test_time.py @@ -0,0 +1,23 @@ +from __future__ import absolute_import, unicode_literals + +import pytest + +from kombu.utils.time import maybe_s_to_ms + + +@pytest.mark.parametrize('input,expected', [ + (3, 3000), + (3.0, 3000), + (303, 303000), + (303.33, 303330), + (303.333, 303333), + (303.3334, 303333), + (None, None), + (0, 0), +]) +def test_maybe_s_to_ms(input, expected): + ret = maybe_s_to_ms(input) + if expected is None: + assert ret is None + else: + assert ret == expected