mirror of https://github.com/celery/kombu.git
Delegate working with AutoLockRenewer to receiver instance.
This commit is contained in:
parent
f1e06c2ee6
commit
59fe35ef2f
|
@ -202,9 +202,14 @@ class Channel(virtual.Channel):
|
|||
cache_key = queue_cache_key or queue
|
||||
queue_obj = self._queue_cache.get(cache_key, None)
|
||||
if queue_obj is None or queue_obj.receiver is None:
|
||||
auto_lock_renewer = None
|
||||
if self.use_lock_renewal:
|
||||
auto_lock_renewer = AutoLockRenewer(max_lock_renewal_duration=self.max_lock_renewal_duration)
|
||||
|
||||
receiver = self.queue_service.get_queue_receiver(
|
||||
queue_name=queue, receive_mode=recv_mode,
|
||||
keep_alive=self.uamqp_keep_alive_interval)
|
||||
keep_alive=self.uamqp_keep_alive_interval,
|
||||
auto_lock_renewer=auto_lock_renewer)
|
||||
queue_obj = self._add_queue_to_cache(cache_key, receiver=receiver)
|
||||
return queue_obj
|
||||
|
||||
|
@ -277,14 +282,6 @@ class Channel(virtual.Channel):
|
|||
# message.body is either byte or generator[bytes]
|
||||
message = messages[0]
|
||||
|
||||
if self.use_lock_renewal:
|
||||
with self.queue_service.get_queue_receiver(
|
||||
queue_name=queue,
|
||||
receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
|
||||
keep_alive=self.uamqp_keep_alive_interval
|
||||
) as receiver, AutoLockRenewer() as lock_renewer:
|
||||
lock_renewer.register(receiver, message, max_lock_renewal_duration=self.max_lock_renewal_duration)
|
||||
|
||||
if not isinstance(message.body, bytes):
|
||||
body = b''.join(message.body)
|
||||
else:
|
||||
|
|
Loading…
Reference in New Issue