kombu/funtests/tests/test_SQS.py

25 lines
734 B
Python
Raw Normal View History

import os
from nose import SkipTest
from funtests import transport
class test_SQS(transport.TransportCase):
2013-04-15 16:44:13 +00:00
transport = 'SQS'
prefix = 'sqs'
event_loop_max = 100
message_size_limit = 4192 # SQS max body size / 2.
2011-06-07 14:20:09 +00:00
reliable_purge = False
suppress_disorder_warning = True # does not guarantee FIFO order,
# even in simple cases.
def before_connect(self):
2013-04-15 16:44:13 +00:00
if 'AWS_ACCESS_KEY_ID' not in os.environ:
raise SkipTest('Missing envvar AWS_ACCESS_KEY_ID')
if 'AWS_SECRET_ACCESS_KEY' not in os.environ:
raise SkipTest('Missing envvar AWS_SECRET_ACCESS_KEY')
def after_connect(self, connection):
2011-06-05 18:02:51 +00:00
connection.channel().sqs