diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 91d8a106..6c89fd5d 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -288,7 +288,7 @@ class Channel(virtual.Channel): if resp['Messages']: for m in resp['Messages']: - m['Body'] = AsyncMessage().decode(m['Body']) + m['Body'] = AsyncMessage(body=m['Body']).decode() for msg in self._messages_to_python(resp['Messages'], queue): self.connection._deliver(msg, queue) return @@ -300,7 +300,7 @@ class Channel(virtual.Channel): resp = self.sqs.receive_message(q_url) if resp['Messages']: - body = AsyncMessage().decode(resp['Messages'][0]['Body']) + body = AsyncMessage(body=resp['Messages'][0]['Body']).decode() resp['Messages'][0]['Body'] = body return self._messages_to_python(resp['Messages'], queue)[0] raise Empty() @@ -399,7 +399,7 @@ class Channel(virtual.Channel): size += int(self._size(queue)) if not size: break - self.sqs.purge_queue(q) + self.sqs.purge_queue(QueueUrl=q) return size def close(self): @@ -420,19 +420,20 @@ class Channel(virtual.Channel): aws_secret_access_key=self.conninfo.password, ) is_secure = self.is_secure if self.is_secure is not None else True - self._sqs = session.client('sqs', use_ssl=is_secure) + client_kwargs = dict( + use_ssl=is_secure + ) + if self.endpoint_url is not None: + client_kwargs['endpoint_url'] = self.endpoint_url + self._sqs = session.client('sqs', **client_kwargs) return self._sqs @property def asynsqs(self): if self._asynsqs is None: - is_secure = self.is_secure if self.is_secure is not None else True self._asynsqs = AsyncSQSConnection( sqs_connection=self.sqs, - aws_access_key_id=self.conninfo.userid, - aws_secret_access_key=self.conninfo.password, - region=self.region, - is_secure=is_secure, + region=self.region ) return self._asynsqs @@ -473,6 +474,20 @@ class Channel(virtual.Channel): def port(self): return self.transport_options.get('port') + @cached_property + def endpoint_url(self): + if self.conninfo.hostname is not None: + scheme = 'https' if self.is_secure else 'http' + if self.conninfo.port is not None: + port = ':{}'.format(self.conninfo.port) + else: + port = '' + return '{}://{}{}'.format( + scheme, + self.conninfo.hostname, + port + ) + @cached_property def wait_time_seconds(self): return self.transport_options.get('wait_time_seconds', diff --git a/requirements/funtest.txt b/requirements/funtest.txt index 033db4b8..50e75356 100644 --- a/requirements/funtest.txt +++ b/requirements/funtest.txt @@ -8,7 +8,6 @@ pymongo kazoo # SQS transport -boto boto3 # Qpid transport