mirror of https://github.com/celery/kombu.git
Amazon SQS transport passing functional tests. Sponsored by the good guys at Yipit.com!
This commit is contained in:
parent
2644950f74
commit
1ab629c23c
|
@ -0,0 +1,22 @@
|
|||
import os
|
||||
|
||||
from nose import SkipTest
|
||||
|
||||
from funtests import transport
|
||||
|
||||
|
||||
class test_SQS(transport.TransportCase):
|
||||
transport = "SQS"
|
||||
prefix = "sqs"
|
||||
sep = "-" # SQS queue names cannot include '.'
|
||||
event_loop_max = 100
|
||||
message_size_limit = 4192 # SQS max body size / 2.
|
||||
|
||||
def before_connect(self):
|
||||
if "AWS_ACCESS_KEY_ID" not in os.environ:
|
||||
raise SkipTest("Missing envvar AWS_ACCESS_KEY_ID")
|
||||
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
|
||||
raise SkipTest("Missing envvar AWS_SECRET_ACCESS_KEY")
|
||||
|
||||
def after_connect(self, connection):
|
||||
connection.channel().client
|
|
@ -56,6 +56,9 @@ def consumeN(conn, consumer, n=1, timeout=30):
|
|||
class TransportCase(unittest.TestCase):
|
||||
transport = None
|
||||
prefix = None
|
||||
sep = '.'
|
||||
userid = None
|
||||
password = None
|
||||
event_loop_max = 100
|
||||
connection_options = {}
|
||||
|
||||
|
@ -86,6 +89,10 @@ class TransportCase(unittest.TestCase):
|
|||
map(chan.queue_purge, names)
|
||||
|
||||
def get_connection(self, **options):
|
||||
if self.userid:
|
||||
options.setdefault("userid", self.userid)
|
||||
if self.password:
|
||||
options.setdefault("password", self.password)
|
||||
return BrokerConnection(transport=self.transport, **options)
|
||||
|
||||
def do_connect(self):
|
||||
|
@ -160,7 +167,7 @@ class TransportCase(unittest.TestCase):
|
|||
self.purge([self.queue.name])
|
||||
|
||||
def P(self, rest):
|
||||
return "%s.%s" % (self.prefix, rest)
|
||||
return "%s%s%s" % (self.prefix, self.sep, rest)
|
||||
|
||||
def test_produce__consume_multiple(self):
|
||||
if not self.verify_alive():
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
|
||||
"""
|
||||
kombu.transport.SQS
|
||||
===================
|
||||
|
||||
Amazon SQS transport.
|
||||
|
||||
:copyright: (c) 2010 - 2011 by Ask Solem
|
||||
:license: BSD, see LICENSE for more details.
|
||||
|
||||
"""
|
||||
from Queue import Empty
|
||||
|
||||
from anyjson import serialize, deserialize
|
||||
from boto.sqs.connection import SQSConnection
|
||||
from boto.sqs.message import Message
|
||||
|
||||
from kombu.transport import virtual
|
||||
from kombu.utils import cached_property
|
||||
|
||||
class Channel(virtual.Channel):
|
||||
_client = None
|
||||
|
||||
def _new_queue(self, queue, **kwargs):
|
||||
return self.client.create_queue(queue, self.visibility_timeout)
|
||||
|
||||
def _get(self, queue):
|
||||
q = self._new_queue(queue)
|
||||
rs = q.get_messages(1)
|
||||
if rs:
|
||||
return deserialize(rs[0].get_body())
|
||||
raise Empty()
|
||||
|
||||
def _size(self, queue):
|
||||
return self._new_queue(queue).count()
|
||||
|
||||
def _put(self, queue, message, **kwargs):
|
||||
q = self._new_queue(queue)
|
||||
m = Message()
|
||||
m.set_body(serialize(message))
|
||||
q.write(m)
|
||||
|
||||
def _purge(self, queue):
|
||||
q = self._new_queue(queue)
|
||||
size = q.count()
|
||||
q.clear()
|
||||
return size
|
||||
|
||||
def close(self):
|
||||
super(Channel, self).close()
|
||||
if self._client:
|
||||
try:
|
||||
self._client.close()
|
||||
except AttributeError, exc: # FIXME ???
|
||||
if "can't set attribute" not in str(exc):
|
||||
raise
|
||||
|
||||
def _open(self):
|
||||
conninfo = self.connection.client
|
||||
return SQSConnection(conninfo.userid, conninfo.password)
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
if self._client is None:
|
||||
self._client = self._open()
|
||||
return self._client
|
||||
|
||||
@cached_property
|
||||
def visibility_timeout(self):
|
||||
options = self.connection.client.transport_options
|
||||
return options.get("visibility_timeout")
|
||||
|
||||
|
||||
class Transport(virtual.Transport):
|
||||
Channel = Channel
|
||||
|
||||
interval = 1
|
||||
default_port = None
|
||||
connection_errors = ()
|
||||
channel_errors = ()
|
|
@ -62,7 +62,6 @@ def _ghettoq(name, new, alias=None):
|
|||
|
||||
You should replace %r with simply: %r
|
||||
""" % (name, gtransport, this))
|
||||
print("TTT: %r" % ktransport)
|
||||
return ktransport
|
||||
|
||||
return __inner
|
||||
|
@ -75,6 +74,7 @@ TRANSPORT_ALIASES = {
|
|||
"syncpika": "kombu.transport.pypika.SyncTransport",
|
||||
"memory": "kombu.transport.memory.Transport",
|
||||
"redis": "kombu.transport.pyredis.Transport",
|
||||
"SQS": "kombu.transport.SQS.Transport",
|
||||
"beanstalk": "kombu.transport.beanstalk.Transport",
|
||||
"mongodb": "kombu.transport.mongodb.Transport",
|
||||
"couchdb": "kombu.transport.pycouchdb.Transport",
|
||||
|
|
|
@ -152,7 +152,8 @@ class AsyncoreConnection(asyncore_adapter.AsyncoreConnection):
|
|||
class SyncTransport(base.Transport):
|
||||
default_port = DEFAULT_PORT
|
||||
|
||||
connection_errors = (exceptions.ConnectionClosed,
|
||||
connection_errors = (socket.error,
|
||||
exceptions.ConnectionClosed,
|
||||
exceptions.ChannelClosed,
|
||||
exceptions.LoginError,
|
||||
exceptions.NoFreeChannels,
|
||||
|
|
Loading…
Reference in New Issue