diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py index a8c3124f..7cc6e35f 100644 --- a/kombu/asynchronous/hub.py +++ b/kombu/asynchronous/hub.py @@ -254,6 +254,14 @@ class Hub(object): for callback in self.on_close: callback(self) + # Complete remaining todo before Hub close + # Eg: Acknowledge message + # To avoid infinite loop where one of the callables adds items + # to self._ready (via call_soon or otherwise). + # we create new list with current self._ready + for item in list(self._ready): + item() + def _discard(self, fd): fd = fileno(fd) self.readers.pop(fd, None) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index c3e19203..b4d5093b 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -209,7 +209,14 @@ class Channel(virtual.Channel): message['properties']['MessageDeduplicationId'] else: kwargs['MessageDeduplicationId'] = str(uuid.uuid4()) - self.sqs.send_message(**kwargs) + if message.get('redelivered'): + self.sqs.change_message_visibility( + QueueUrl=q_url, + ReceiptHandle=message['properties']['delivery_tag'], + VisibilityTimeout=0 + ) + else: + self.sqs.send_message(**kwargs) def _message_to_python(self, message, queue_name, queue): body = base64.b64decode(message['Body'].encode()) @@ -377,8 +384,8 @@ class Channel(virtual.Channel): except KeyError: pass else: - self.asynsqs.delete_message(message['sqs_queue'], - sqs_message['ReceiptHandle']) + self.sqs.delete_message(QueueUrl=message['sqs_queue'], + ReceiptHandle=sqs_message['ReceiptHandle']) super(Channel, self).basic_ack(delivery_tag) def _size(self, queue): diff --git a/t/unit/asynchronous/test_hub.py b/t/unit/asynchronous/test_hub.py index 6659a163..6c6519c5 100644 --- a/t/unit/asynchronous/test_hub.py +++ b/t/unit/asynchronous/test_hub.py @@ -236,6 +236,7 @@ class test_Hub: poller = self.hub.poller self.hub.stop() + self.hub._ready = set() self.hub.close() poller.close.assert_called_with() @@ -243,6 +244,7 @@ class test_Hub: self.hub = Hub() assert self.hub.poller self.hub.stop() + self.hub._ready = set() self.hub.close() assert self.hub._poller is None assert self.hub.poller, 'It should be regenerated automatically!' diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 4ccf3292..f647f47d 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -290,6 +290,16 @@ class test_Channel: results = self.queue(self.channel).get().payload assert message == results + def test_redelivered(self): + self.channel.sqs.change_message_visibility = \ + Mock(name='change_message_visibility') + message = { + 'redelivered': True, + 'properties': {'delivery_tag': 'test_message_id'} + } + self.channel._put(self.producer.routing_key, message) + self.sqs_conn_mock.change_message_visibility.assert_called_once() + def test_put_and_get_bulk(self): # With QoS.prefetch_count = 0 message = 'my test message' @@ -399,3 +409,21 @@ class test_Channel: # called? assert (expected_receive_messages_count == self.sqs_conn_mock._receive_messages_calls) + + def test_basic_ack(self, ): + """Test that basic_ack calls the delete_message properly""" + message = { + 'sqs_message': { + 'ReceiptHandle': '1' + }, + 'sqs_queue': 'testing_queue' + } + mock_messages = Mock() + mock_messages.delivery_info = message + self.channel.qos.append(mock_messages, 1) + self.channel.sqs.delete_message = Mock() + self.channel.basic_ack(1) + self.sqs_conn_mock.delete_message.assert_called_with( + QueueUrl=message['sqs_queue'], + ReceiptHandle=message['sqs_message']['ReceiptHandle'] + )