diff --git a/README.rst b/README.rst index e23b9e21..f54904c4 100644 --- a/README.rst +++ b/README.rst @@ -30,7 +30,8 @@ Features * Virtual transports makes it really easy to add support for non-AMQP transports. There is already built-in support for `Redis`_, - `Beanstalk`_, `Amazon SQS`_, `CouchDB`_, `MongoDB`_ and `ZooKeeper`_. + `Beanstalk`_, `Amazon SQS`_, `CouchDB`_, `MongoDB`_ and `ZooKeeper`_, + `SoftLayer MQ`_. * You can also use the SQLAlchemy and Django ORM transports to use a database as the broker. @@ -71,6 +72,7 @@ and the `Wikipedia article about AMQP`_. .. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP .. _`carrot`: http://pypi.python.org/pypi/carrot/ .. _`librabbitmq`: http://pypi.python.org/pypi/librabbitmq +.. _`SoftLayer Message Queue`: http://www.softlayer.com/services/additional/message-queue Transport Comparison @@ -99,6 +101,8 @@ Transport Comparison +---------------+----------+------------+------------+---------------+ | *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | +---------------+----------+------------+------------+---------------+ +| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | ++---------------+----------+------------+------------+---------------+ .. [#f1] Declarations only kept in memory, so exchanges/queues diff --git a/docs/reference/kombu.transport.SLMQ.rst b/docs/reference/kombu.transport.SLMQ.rst new file mode 100644 index 00000000..2eba589c --- /dev/null +++ b/docs/reference/kombu.transport.SLMQ.rst @@ -0,0 +1,20 @@ +.. currentmodule:: kombu.transport.SLMQ + +.. automodule:: kombu.transport.SLMQ + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: diff --git a/funtests/tests/test_SLMQ.py b/funtests/tests/test_SLMQ.py new file mode 100644 index 00000000..d8fd47a1 --- /dev/null +++ b/funtests/tests/test_SLMQ.py @@ -0,0 +1,29 @@ + +from funtests import transport +from nose import SkipTest +import os + + +class test_SLMQ(transport.TransportCase): + transport = "SLMQ" + prefix = "slmq" + event_loop_max = 100 + message_size_limit = 4192 + reliable_purge = False + suppress_disorder_warning = True # does not guarantee FIFO order, + # even in simple cases. + + def before_connect(self): + if "SLMQ_ACCOUNT" not in os.environ: + raise SkipTest("Missing envvar SLMQ_ACCOUNT") + if "SL_USERNAME" not in os.environ: + raise SkipTest("Missing envvar SL_USERNAME") + if "SL_API_KEY" not in os.environ: + raise SkipTest("Missing envvar SL_API_KEY") + if "SLMQ_HOST" not in os.environ: + raise SkipTest("Missing envvar SLMQ_HOST") + if "SLMQ_SECURE" not in os.environ: + raise SkipTest("Missing envvar SLMQ_SECURE") + + def after_connect(self, connection): + pass diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py new file mode 100644 index 00000000..75d8f0b8 --- /dev/null +++ b/kombu/transport/SLMQ.py @@ -0,0 +1,176 @@ +""" +kombu.transport.SLMQ +=================== + +SoftLayer Message Queue transport. + +""" +from __future__ import absolute_import + +import socket +import string + +from Queue import Empty + +from anyjson import loads, dumps + +import softlayer_messaging +import os + +from kombu.utils import cached_property # , uuid +from kombu.utils.encoding import safe_str + +from . import virtual + +# dots are replaced by dash, all other punctuation replaced by underscore. +CHARS_REPLACE_TABLE = dict( + (ord(c), 0x5f) for c in string.punctuation if c not in '_') + + +class Channel(virtual.Channel): + default_visibility_timeout = 1800 # 30 minutes. + domain_format = 'kombu%(vhost)s' + _slmq = None + _queue_cache = {} + _noack_queues = set() + + def __init__(self, *args, **kwargs): + super(Channel, self).__init__(*args, **kwargs) + queues = self.slmq.queues() + for queue in queues: + self._queue_cache[queue] = queue + + def basic_consume(self, queue, no_ack, *args, **kwargs): + if no_ack: + self._noack_queues.add(queue) + return super(Channel, self).basic_consume(queue, no_ack, + *args, **kwargs) + + def basic_cancel(self, consumer_tag): + if consumer_tag in self._consumers: + queue = self._tag_to_queue[consumer_tag] + self._noack_queues.discard(queue) + return super(Channel, self).basic_cancel(consumer_tag) + + def entity_name(self, name, table=CHARS_REPLACE_TABLE): + """Format AMQP queue name into a valid SLQS queue name.""" + return unicode(safe_str(name)).translate(table) + + def _new_queue(self, queue, **kwargs): + """Ensures a queue exists in SLQS.""" + queue = self.entity_name(self.queue_name_prefix + queue) + try: + return self._queue_cache[queue] + except KeyError: + try: + self.slmq.create_queue( + queue, visibility_timeout=self.visibility_timeout) + except softlayer_messaging.errors.ResponseError: + pass + q = self._queue_cache[queue] = self.slmq.queue(queue) + return q + + def _delete(self, queue, *args): + """delete queue by name.""" + queue_name = self.entity_name(queue) + self._queue_cache.pop(queue_name, None) + self.slmq.queue(queue_name).delete(force=True) + super(Channel, self)._delete(queue_name) + + def _put(self, queue, message, **kwargs): + """Put message onto queue.""" + q = self._new_queue(queue) + q.push(dumps(message)) + + def _get(self, queue): + """Try to retrieve a single message off ``queue``.""" + q = self._new_queue(queue) + rs = q.pop(1) + if rs['items']: + m = rs['items'][0] + payload = loads(m['body']) + if queue in self._noack_queues: + q.message(m['id']).delete() + else: + payload['properties']['delivery_info'].update({ + 'slmq_message_id': m['id'], 'slmq_queue_name': q.name}) + return payload + raise Empty() + + def basic_ack(self, delivery_tag): + delivery_info = self.qos.get(delivery_tag).delivery_info + try: + queue = delivery_info['slmq_queue_name'] + except KeyError: + pass + else: + self.delete_message(queue, delivery_info['slmq_message_id']) + super(Channel, self).basic_ack(delivery_tag) + + def _size(self, queue): + """Returns the number of messages in a queue.""" + return self._new_queue(queue).detail()['message_count'] + + def _purge(self, queue): + """Deletes all current messages in a queue.""" + q = self._new_queue(queue) + n = 0 + l = q.pop(10) + while l['items']: + for m in l['items']: + self.delete_message(queue, m['id']) + n += 1 + l = q.pop(10) + return n + + def delete_message(self, queue, message_id): + q = self.slmq.queue(self.entity_name(queue)) + return q.message(message_id).delete() + + @property + def slmq(self): + if self._slmq is None: + conninfo = self.conninfo + account = os.environ.get('SLMQ_ACCOUNT', conninfo.virtual_host) + user = os.environ.get('SL_USERNAME', conninfo.userid) + api_key = os.environ.get('SL_API_KEY', conninfo.password) + host = os.environ.get('SLMQ_HOST', conninfo.hostname) + port = os.environ.get('SLMQ_PORT', conninfo.port) + secure = bool(os.environ.get( + 'SLMQ_SECURE', self.transport_options.get('secure')) or True) + if secure: + endpoint = "https://%s" % host + else: + endpoint = "http://%s" % host + if port: + endpoint = "%s:%s" % (endpoint, port) + + self._slmq = softlayer_messaging.get_client( + account, endpoint=endpoint) + self._slmq.authenticate(user, api_key) + return self._slmq + + @property + def conninfo(self): + return self.connection.client + + @property + def transport_options(self): + return self.connection.client.transport_options + + @cached_property + def visibility_timeout(self): + return (self.transport_options.get('visibility_timeout') or + self.default_visibility_timeout) + + @cached_property + def queue_name_prefix(self): + return self.transport_options.get('queue_name_prefix', '') + + +class Transport(virtual.Transport): + Channel = Channel + + polling_interval = 1 + default_port = None + connection_errors = (softlayer_messaging.ResponseError, socket.error) diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index e14015ef..6f5d26e2 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -59,6 +59,8 @@ TRANSPORT_ALIASES = { 'django': 'kombu.transport.django:Transport', 'sqlalchemy': 'kombu.transport.sqlalchemy:Transport', 'sqla': 'kombu.transport.sqlalchemy:Transport', + 'SLMQ': 'kombu.transport.SLMQ.Transport', + 'slmq': 'kombu.transport.SLMQ.Transport', 'ghettoq.taproot.Redis': _ghettoq('Redis', 'redis', 'redis'), 'ghettoq.taproot.Database': _ghettoq('Database', 'django', 'django'), 'ghettoq.taproot.MongoDB': _ghettoq('MongoDB', 'mongodb'),