diff --git a/AUTHORS b/AUTHORS index 551e3a55..af22a804 100644 --- a/AUTHORS +++ b/AUTHORS @@ -97,6 +97,7 @@ Mher Movsisyan Michael Barrett Michael Nelson Nathan Van Gheem +Nicolas Mota Nitzan Miron Noah Kantrowitz Ollie Walsh diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index e89a3f8b..c96350af 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -50,6 +50,7 @@ class Channel(virtual.Channel): default_visibility_timeout = 1800 # 30 minutes. default_wait_time_seconds = 5 # in seconds + default_peek_lock = False domain_format = 'kombu%(vhost)s' _queue_service = None _queue_cache = {} @@ -95,7 +96,7 @@ class Channel(virtual.Channel): message = self.queue_service.receive_queue_message( self.entity_name(queue), timeout=timeout or self.wait_time_seconds, - peek_lock=False + peek_lock=self.peek_lock ) if message.body is None: @@ -154,6 +155,11 @@ class Channel(virtual.Channel): return self.transport_options.get('wait_time_seconds', self.default_wait_time_seconds) + @cached_property + def peek_lock(self): + return self.transport_options.get('peek_lock', + self.default_peek_lock) + class Transport(virtual.Transport): """Azure Service Bus transport.""" diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py index c5b5acca..d5670bba 100644 --- a/t/unit/transport/test_azureservicebus.py +++ b/t/unit/transport/test_azureservicebus.py @@ -25,7 +25,7 @@ class QueueMock(object): def __init__(self, name): self.name = name self.messages = [] - self.message_count = 0 + self.message_count = len(self.messages) def __repr__(self): return 'QueueMock: {} messages'.format(len(self.messages)) @@ -84,14 +84,12 @@ class AzureServiceBusClientMock(object): def receive_queue_message(self, queue_name, peek_lock=True, timeout=60): queue = self.get_queue(queue_name) - if queue: - try: - return queue.messages.pop(0) - except IndexError: - return Message() + if queue and len(queue.messages): + return queue.messages.pop(0) + return Message() def read_delete_queue_message(self, queue_name, timeout='60'): - return self.receive_queue_message(queue_name) + return self.receive_queue_message(queue_name, timeout=timeout) def delete_queue(self, queue_name=None): queue = self.get_queue(queue_name) @@ -188,6 +186,18 @@ class test_Channel: self.channel.transport_options['wait_time_seconds'] = 10 assert self.channel.wait_time_seconds == 10 + def test_peek_lock(self): + # Test getting default peek lock + assert ( + self.channel.peek_lock == + azureservicebus.Channel.default_peek_lock + ) + + # Test getting value setted in transport options + del self.channel.peek_lock + self.channel.transport_options['peek_lock'] = True + assert self.channel.peek_lock is True + def test_get_from_azure(self): # Test getting a single message message = 'my test message' @@ -222,6 +232,8 @@ class test_Channel: # Test deleting queue without message queue_name = 'new_unittest_queue' self.channel._new_queue(queue_name) + + assert queue_name in self.channel._queue_cache self.channel._delete(queue_name) assert queue_name not in self.channel._queue_cache