kombu/funtests/tests/test_SQS.py

23 lines
629 B
Python
Raw Normal View History

import os
from nose import SkipTest
from funtests import transport
class test_SQS(transport.TransportCase):
transport = "SQS"
prefix = "sqs"
sep = "-" # SQS queue names cannot include '.'
event_loop_max = 100
message_size_limit = 4192 # SQS max body size / 2.
def before_connect(self):
if "AWS_ACCESS_KEY_ID" not in os.environ:
raise SkipTest("Missing envvar AWS_ACCESS_KEY_ID")
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
raise SkipTest("Missing envvar AWS_SECRET_ACCESS_KEY")
def after_connect(self, connection):
connection.channel().client