mirror of https://github.com/celery/kombu.git
Allow endpoint URL to be specified in the boto3 connection
- Minor fixes after re-enabling unit tests
This commit is contained in:
parent
6c3bb07e19
commit
cdbfe9a64e
|
@ -288,7 +288,7 @@ class Channel(virtual.Channel):
|
|||
|
||||
if resp['Messages']:
|
||||
for m in resp['Messages']:
|
||||
m['Body'] = AsyncMessage().decode(m['Body'])
|
||||
m['Body'] = AsyncMessage(body=m['Body']).decode()
|
||||
for msg in self._messages_to_python(resp['Messages'], queue):
|
||||
self.connection._deliver(msg, queue)
|
||||
return
|
||||
|
@ -300,7 +300,7 @@ class Channel(virtual.Channel):
|
|||
resp = self.sqs.receive_message(q_url)
|
||||
|
||||
if resp['Messages']:
|
||||
body = AsyncMessage().decode(resp['Messages'][0]['Body'])
|
||||
body = AsyncMessage(body=resp['Messages'][0]['Body']).decode()
|
||||
resp['Messages'][0]['Body'] = body
|
||||
return self._messages_to_python(resp['Messages'], queue)[0]
|
||||
raise Empty()
|
||||
|
@ -399,7 +399,7 @@ class Channel(virtual.Channel):
|
|||
size += int(self._size(queue))
|
||||
if not size:
|
||||
break
|
||||
self.sqs.purge_queue(q)
|
||||
self.sqs.purge_queue(QueueUrl=q)
|
||||
return size
|
||||
|
||||
def close(self):
|
||||
|
@ -420,19 +420,20 @@ class Channel(virtual.Channel):
|
|||
aws_secret_access_key=self.conninfo.password,
|
||||
)
|
||||
is_secure = self.is_secure if self.is_secure is not None else True
|
||||
self._sqs = session.client('sqs', use_ssl=is_secure)
|
||||
client_kwargs = dict(
|
||||
use_ssl=is_secure
|
||||
)
|
||||
if self.endpoint_url is not None:
|
||||
client_kwargs['endpoint_url'] = self.endpoint_url
|
||||
self._sqs = session.client('sqs', **client_kwargs)
|
||||
return self._sqs
|
||||
|
||||
@property
|
||||
def asynsqs(self):
|
||||
if self._asynsqs is None:
|
||||
is_secure = self.is_secure if self.is_secure is not None else True
|
||||
self._asynsqs = AsyncSQSConnection(
|
||||
sqs_connection=self.sqs,
|
||||
aws_access_key_id=self.conninfo.userid,
|
||||
aws_secret_access_key=self.conninfo.password,
|
||||
region=self.region,
|
||||
is_secure=is_secure,
|
||||
region=self.region
|
||||
)
|
||||
return self._asynsqs
|
||||
|
||||
|
@ -473,6 +474,20 @@ class Channel(virtual.Channel):
|
|||
def port(self):
|
||||
return self.transport_options.get('port')
|
||||
|
||||
@cached_property
|
||||
def endpoint_url(self):
|
||||
if self.conninfo.hostname is not None:
|
||||
scheme = 'https' if self.is_secure else 'http'
|
||||
if self.conninfo.port is not None:
|
||||
port = ':{}'.format(self.conninfo.port)
|
||||
else:
|
||||
port = ''
|
||||
return '{}://{}{}'.format(
|
||||
scheme,
|
||||
self.conninfo.hostname,
|
||||
port
|
||||
)
|
||||
|
||||
@cached_property
|
||||
def wait_time_seconds(self):
|
||||
return self.transport_options.get('wait_time_seconds',
|
||||
|
|
|
@ -8,7 +8,6 @@ pymongo
|
|||
kazoo
|
||||
|
||||
# SQS transport
|
||||
boto
|
||||
boto3
|
||||
|
||||
# Qpid transport
|
||||
|
|
Loading…
Reference in New Issue