mirror of https://github.com/celery/kombu.git
Add peek lock settings to be changed using transport options (#1119)
This commit is contained in:
parent
ccc9e01f32
commit
0c6444ac88
1
AUTHORS
1
AUTHORS
|
@ -97,6 +97,7 @@ Mher Movsisyan <mher.movsisyan@gmail.com>
|
|||
Michael Barrett <mb@eventbrite.com>
|
||||
Michael Nelson <michaeln@telesign.com>
|
||||
Nathan Van Gheem <vangheem@gmail.com>
|
||||
Nicolas Mota <nicolas_mota@me.com>
|
||||
Nitzan Miron <bug.assembla@bugbug.me>
|
||||
Noah Kantrowitz <noah@coderanger.net>
|
||||
Ollie Walsh <ollie.walsh@geemail.kom>
|
||||
|
|
|
@ -50,6 +50,7 @@ class Channel(virtual.Channel):
|
|||
|
||||
default_visibility_timeout = 1800 # 30 minutes.
|
||||
default_wait_time_seconds = 5 # in seconds
|
||||
default_peek_lock = False
|
||||
domain_format = 'kombu%(vhost)s'
|
||||
_queue_service = None
|
||||
_queue_cache = {}
|
||||
|
@ -95,7 +96,7 @@ class Channel(virtual.Channel):
|
|||
message = self.queue_service.receive_queue_message(
|
||||
self.entity_name(queue),
|
||||
timeout=timeout or self.wait_time_seconds,
|
||||
peek_lock=False
|
||||
peek_lock=self.peek_lock
|
||||
)
|
||||
|
||||
if message.body is None:
|
||||
|
@ -154,6 +155,11 @@ class Channel(virtual.Channel):
|
|||
return self.transport_options.get('wait_time_seconds',
|
||||
self.default_wait_time_seconds)
|
||||
|
||||
@cached_property
|
||||
def peek_lock(self):
|
||||
return self.transport_options.get('peek_lock',
|
||||
self.default_peek_lock)
|
||||
|
||||
|
||||
class Transport(virtual.Transport):
|
||||
"""Azure Service Bus transport."""
|
||||
|
|
|
@ -25,7 +25,7 @@ class QueueMock(object):
|
|||
def __init__(self, name):
|
||||
self.name = name
|
||||
self.messages = []
|
||||
self.message_count = 0
|
||||
self.message_count = len(self.messages)
|
||||
|
||||
def __repr__(self):
|
||||
return 'QueueMock: {} messages'.format(len(self.messages))
|
||||
|
@ -84,14 +84,12 @@ class AzureServiceBusClientMock(object):
|
|||
|
||||
def receive_queue_message(self, queue_name, peek_lock=True, timeout=60):
|
||||
queue = self.get_queue(queue_name)
|
||||
if queue:
|
||||
try:
|
||||
return queue.messages.pop(0)
|
||||
except IndexError:
|
||||
return Message()
|
||||
if queue and len(queue.messages):
|
||||
return queue.messages.pop(0)
|
||||
return Message()
|
||||
|
||||
def read_delete_queue_message(self, queue_name, timeout='60'):
|
||||
return self.receive_queue_message(queue_name)
|
||||
return self.receive_queue_message(queue_name, timeout=timeout)
|
||||
|
||||
def delete_queue(self, queue_name=None):
|
||||
queue = self.get_queue(queue_name)
|
||||
|
@ -188,6 +186,18 @@ class test_Channel:
|
|||
self.channel.transport_options['wait_time_seconds'] = 10
|
||||
assert self.channel.wait_time_seconds == 10
|
||||
|
||||
def test_peek_lock(self):
|
||||
# Test getting default peek lock
|
||||
assert (
|
||||
self.channel.peek_lock ==
|
||||
azureservicebus.Channel.default_peek_lock
|
||||
)
|
||||
|
||||
# Test getting value setted in transport options
|
||||
del self.channel.peek_lock
|
||||
self.channel.transport_options['peek_lock'] = True
|
||||
assert self.channel.peek_lock is True
|
||||
|
||||
def test_get_from_azure(self):
|
||||
# Test getting a single message
|
||||
message = 'my test message'
|
||||
|
@ -222,6 +232,8 @@ class test_Channel:
|
|||
# Test deleting queue without message
|
||||
queue_name = 'new_unittest_queue'
|
||||
self.channel._new_queue(queue_name)
|
||||
|
||||
assert queue_name in self.channel._queue_cache
|
||||
self.channel._delete(queue_name)
|
||||
assert queue_name not in self.channel._queue_cache
|
||||
|
||||
|
|
Loading…
Reference in New Issue