From 59fe35ef2f800165adc1c508080d8f8a7e7deed7 Mon Sep 17 00:00:00 2001 From: Konstantin Lapkovsky Date: Wed, 13 Sep 2023 16:10:59 +0400 Subject: [PATCH] Delegate working with AutoLockRenewer to receiver instance. --- kombu/transport/azureservicebus.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index a9ea004e..ceb52f83 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -202,9 +202,14 @@ class Channel(virtual.Channel): cache_key = queue_cache_key or queue queue_obj = self._queue_cache.get(cache_key, None) if queue_obj is None or queue_obj.receiver is None: + auto_lock_renewer = None + if self.use_lock_renewal: + auto_lock_renewer = AutoLockRenewer(max_lock_renewal_duration=self.max_lock_renewal_duration) + receiver = self.queue_service.get_queue_receiver( queue_name=queue, receive_mode=recv_mode, - keep_alive=self.uamqp_keep_alive_interval) + keep_alive=self.uamqp_keep_alive_interval, + auto_lock_renewer=auto_lock_renewer) queue_obj = self._add_queue_to_cache(cache_key, receiver=receiver) return queue_obj @@ -277,14 +282,6 @@ class Channel(virtual.Channel): # message.body is either byte or generator[bytes] message = messages[0] - if self.use_lock_renewal: - with self.queue_service.get_queue_receiver( - queue_name=queue, - receive_mode=ServiceBusReceiveMode.PEEK_LOCK, - keep_alive=self.uamqp_keep_alive_interval - ) as receiver, AutoLockRenewer() as lock_renewer: - lock_renewer.register(receiver, message, max_lock_renewal_duration=self.max_lock_renewal_duration) - if not isinstance(message.body, bytes): body = b''.join(message.body) else: