mirror of https://github.com/celery/kombu.git
Fixed Worker shutdown creates duplicate messages in SQS broker (#926)
* On restore changes message visibility instead of send new message * Acknowledge message on hub close * Use sqs instead of async sqs to delete message * changes itertools to range * Empty Hub ready * fixed test_poller_regeneration_on_access * Fixed typo in comment * Simplify loop to process self._ready * Added test case for redelivered _put * Lint fixes * Added test case for delete_message call
This commit is contained in:
parent
0af7519f98
commit
94227bbc43
|
@ -254,6 +254,14 @@ class Hub(object):
|
|||
for callback in self.on_close:
|
||||
callback(self)
|
||||
|
||||
# Complete remaining todo before Hub close
|
||||
# Eg: Acknowledge message
|
||||
# To avoid infinite loop where one of the callables adds items
|
||||
# to self._ready (via call_soon or otherwise).
|
||||
# we create new list with current self._ready
|
||||
for item in list(self._ready):
|
||||
item()
|
||||
|
||||
def _discard(self, fd):
|
||||
fd = fileno(fd)
|
||||
self.readers.pop(fd, None)
|
||||
|
|
|
@ -209,7 +209,14 @@ class Channel(virtual.Channel):
|
|||
message['properties']['MessageDeduplicationId']
|
||||
else:
|
||||
kwargs['MessageDeduplicationId'] = str(uuid.uuid4())
|
||||
self.sqs.send_message(**kwargs)
|
||||
if message.get('redelivered'):
|
||||
self.sqs.change_message_visibility(
|
||||
QueueUrl=q_url,
|
||||
ReceiptHandle=message['properties']['delivery_tag'],
|
||||
VisibilityTimeout=0
|
||||
)
|
||||
else:
|
||||
self.sqs.send_message(**kwargs)
|
||||
|
||||
def _message_to_python(self, message, queue_name, queue):
|
||||
body = base64.b64decode(message['Body'].encode())
|
||||
|
@ -377,8 +384,8 @@ class Channel(virtual.Channel):
|
|||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
self.asynsqs.delete_message(message['sqs_queue'],
|
||||
sqs_message['ReceiptHandle'])
|
||||
self.sqs.delete_message(QueueUrl=message['sqs_queue'],
|
||||
ReceiptHandle=sqs_message['ReceiptHandle'])
|
||||
super(Channel, self).basic_ack(delivery_tag)
|
||||
|
||||
def _size(self, queue):
|
||||
|
|
|
@ -236,6 +236,7 @@ class test_Hub:
|
|||
|
||||
poller = self.hub.poller
|
||||
self.hub.stop()
|
||||
self.hub._ready = set()
|
||||
self.hub.close()
|
||||
poller.close.assert_called_with()
|
||||
|
||||
|
@ -243,6 +244,7 @@ class test_Hub:
|
|||
self.hub = Hub()
|
||||
assert self.hub.poller
|
||||
self.hub.stop()
|
||||
self.hub._ready = set()
|
||||
self.hub.close()
|
||||
assert self.hub._poller is None
|
||||
assert self.hub.poller, 'It should be regenerated automatically!'
|
||||
|
|
|
@ -290,6 +290,16 @@ class test_Channel:
|
|||
results = self.queue(self.channel).get().payload
|
||||
assert message == results
|
||||
|
||||
def test_redelivered(self):
|
||||
self.channel.sqs.change_message_visibility = \
|
||||
Mock(name='change_message_visibility')
|
||||
message = {
|
||||
'redelivered': True,
|
||||
'properties': {'delivery_tag': 'test_message_id'}
|
||||
}
|
||||
self.channel._put(self.producer.routing_key, message)
|
||||
self.sqs_conn_mock.change_message_visibility.assert_called_once()
|
||||
|
||||
def test_put_and_get_bulk(self):
|
||||
# With QoS.prefetch_count = 0
|
||||
message = 'my test message'
|
||||
|
@ -399,3 +409,21 @@ class test_Channel:
|
|||
# called?
|
||||
assert (expected_receive_messages_count ==
|
||||
self.sqs_conn_mock._receive_messages_calls)
|
||||
|
||||
def test_basic_ack(self, ):
|
||||
"""Test that basic_ack calls the delete_message properly"""
|
||||
message = {
|
||||
'sqs_message': {
|
||||
'ReceiptHandle': '1'
|
||||
},
|
||||
'sqs_queue': 'testing_queue'
|
||||
}
|
||||
mock_messages = Mock()
|
||||
mock_messages.delivery_info = message
|
||||
self.channel.qos.append(mock_messages, 1)
|
||||
self.channel.sqs.delete_message = Mock()
|
||||
self.channel.basic_ack(1)
|
||||
self.sqs_conn_mock.delete_message.assert_called_with(
|
||||
QueueUrl=message['sqs_queue'],
|
||||
ReceiptHandle=message['sqs_message']['ReceiptHandle']
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue