diff --git a/funtests/tests/test_SQS.py b/funtests/tests/test_SQS.py new file mode 100644 index 00000000..1b3633d9 --- /dev/null +++ b/funtests/tests/test_SQS.py @@ -0,0 +1,22 @@ +import os + +from nose import SkipTest + +from funtests import transport + + +class test_SQS(transport.TransportCase): + transport = "SQS" + prefix = "sqs" + sep = "-" # SQS queue names cannot include '.' + event_loop_max = 100 + message_size_limit = 4192 # SQS max body size / 2. + + def before_connect(self): + if "AWS_ACCESS_KEY_ID" not in os.environ: + raise SkipTest("Missing envvar AWS_ACCESS_KEY_ID") + if "AWS_SECRET_ACCESS_KEY" not in os.environ: + raise SkipTest("Missing envvar AWS_SECRET_ACCESS_KEY") + + def after_connect(self, connection): + connection.channel().client diff --git a/funtests/transport.py b/funtests/transport.py index e741a155..6086751f 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -56,6 +56,9 @@ def consumeN(conn, consumer, n=1, timeout=30): class TransportCase(unittest.TestCase): transport = None prefix = None + sep = '.' + userid = None + password = None event_loop_max = 100 connection_options = {} @@ -86,6 +89,10 @@ class TransportCase(unittest.TestCase): map(chan.queue_purge, names) def get_connection(self, **options): + if self.userid: + options.setdefault("userid", self.userid) + if self.password: + options.setdefault("password", self.password) return BrokerConnection(transport=self.transport, **options) def do_connect(self): @@ -160,7 +167,7 @@ class TransportCase(unittest.TestCase): self.purge([self.queue.name]) def P(self, rest): - return "%s.%s" % (self.prefix, rest) + return "%s%s%s" % (self.prefix, self.sep, rest) def test_produce__consume_multiple(self): if not self.verify_alive(): diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py new file mode 100644 index 00000000..063fd71a --- /dev/null +++ b/kombu/transport/SQS.py @@ -0,0 +1,80 @@ + +""" +kombu.transport.SQS +=================== + +Amazon SQS transport. + +:copyright: (c) 2010 - 2011 by Ask Solem +:license: BSD, see LICENSE for more details. + +""" +from Queue import Empty + +from anyjson import serialize, deserialize +from boto.sqs.connection import SQSConnection +from boto.sqs.message import Message + +from kombu.transport import virtual +from kombu.utils import cached_property + +class Channel(virtual.Channel): + _client = None + + def _new_queue(self, queue, **kwargs): + return self.client.create_queue(queue, self.visibility_timeout) + + def _get(self, queue): + q = self._new_queue(queue) + rs = q.get_messages(1) + if rs: + return deserialize(rs[0].get_body()) + raise Empty() + + def _size(self, queue): + return self._new_queue(queue).count() + + def _put(self, queue, message, **kwargs): + q = self._new_queue(queue) + m = Message() + m.set_body(serialize(message)) + q.write(m) + + def _purge(self, queue): + q = self._new_queue(queue) + size = q.count() + q.clear() + return size + + def close(self): + super(Channel, self).close() + if self._client: + try: + self._client.close() + except AttributeError, exc: # FIXME ??? + if "can't set attribute" not in str(exc): + raise + + def _open(self): + conninfo = self.connection.client + return SQSConnection(conninfo.userid, conninfo.password) + + @property + def client(self): + if self._client is None: + self._client = self._open() + return self._client + + @cached_property + def visibility_timeout(self): + options = self.connection.client.transport_options + return options.get("visibility_timeout") + + +class Transport(virtual.Transport): + Channel = Channel + + interval = 1 + default_port = None + connection_errors = () + channel_errors = () diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 057d3740..a16614b1 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -62,7 +62,6 @@ def _ghettoq(name, new, alias=None): You should replace %r with simply: %r """ % (name, gtransport, this)) - print("TTT: %r" % ktransport) return ktransport return __inner @@ -75,6 +74,7 @@ TRANSPORT_ALIASES = { "syncpika": "kombu.transport.pypika.SyncTransport", "memory": "kombu.transport.memory.Transport", "redis": "kombu.transport.pyredis.Transport", + "SQS": "kombu.transport.SQS.Transport", "beanstalk": "kombu.transport.beanstalk.Transport", "mongodb": "kombu.transport.mongodb.Transport", "couchdb": "kombu.transport.pycouchdb.Transport", diff --git a/kombu/transport/pypika.py b/kombu/transport/pypika.py index 5926409f..7a8f4e95 100644 --- a/kombu/transport/pypika.py +++ b/kombu/transport/pypika.py @@ -152,7 +152,8 @@ class AsyncoreConnection(asyncore_adapter.AsyncoreConnection): class SyncTransport(base.Transport): default_port = DEFAULT_PORT - connection_errors = (exceptions.ConnectionClosed, + connection_errors = (socket.error, + exceptions.ConnectionClosed, exceptions.ChannelClosed, exceptions.LoginError, exceptions.NoFreeChannels,