mirror of https://github.com/celery/kombu.git
Merge branch 'sudorandom/SLMQ'
This commit is contained in:
commit
f078863bf8
|
@ -30,7 +30,8 @@ Features
|
|||
|
||||
* Virtual transports makes it really easy to add support for non-AMQP
|
||||
transports. There is already built-in support for `Redis`_,
|
||||
`Beanstalk`_, `Amazon SQS`_, `CouchDB`_, `MongoDB`_ and `ZooKeeper`_.
|
||||
`Beanstalk`_, `Amazon SQS`_, `CouchDB`_, `MongoDB`_ and `ZooKeeper`_,
|
||||
`SoftLayer MQ`_.
|
||||
|
||||
* You can also use the SQLAlchemy and Django ORM transports to
|
||||
use a database as the broker.
|
||||
|
@ -71,6 +72,7 @@ and the `Wikipedia article about AMQP`_.
|
|||
.. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP
|
||||
.. _`carrot`: http://pypi.python.org/pypi/carrot/
|
||||
.. _`librabbitmq`: http://pypi.python.org/pypi/librabbitmq
|
||||
.. _`SoftLayer Message Queue`: http://www.softlayer.com/services/additional/message-queue
|
||||
|
||||
|
||||
Transport Comparison
|
||||
|
@ -99,6 +101,8 @@ Transport Comparison
|
|||
+---------------+----------+------------+------------+---------------+
|
||||
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
|
||||
+---------------+----------+------------+------------+---------------+
|
||||
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No |
|
||||
+---------------+----------+------------+------------+---------------+
|
||||
|
||||
|
||||
.. [#f1] Declarations only kept in memory, so exchanges/queues
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
.. currentmodule:: kombu.transport.SLMQ
|
||||
|
||||
.. automodule:: kombu.transport.SLMQ
|
||||
|
||||
.. contents::
|
||||
:local:
|
||||
|
||||
Transport
|
||||
---------
|
||||
|
||||
.. autoclass:: Transport
|
||||
:members:
|
||||
:undoc-members:
|
||||
|
||||
Channel
|
||||
-------
|
||||
|
||||
.. autoclass:: Channel
|
||||
:members:
|
||||
:undoc-members:
|
|
@ -0,0 +1,29 @@
|
|||
|
||||
from funtests import transport
|
||||
from nose import SkipTest
|
||||
import os
|
||||
|
||||
|
||||
class test_SLMQ(transport.TransportCase):
|
||||
transport = "SLMQ"
|
||||
prefix = "slmq"
|
||||
event_loop_max = 100
|
||||
message_size_limit = 4192
|
||||
reliable_purge = False
|
||||
suppress_disorder_warning = True # does not guarantee FIFO order,
|
||||
# even in simple cases.
|
||||
|
||||
def before_connect(self):
|
||||
if "SLMQ_ACCOUNT" not in os.environ:
|
||||
raise SkipTest("Missing envvar SLMQ_ACCOUNT")
|
||||
if "SL_USERNAME" not in os.environ:
|
||||
raise SkipTest("Missing envvar SL_USERNAME")
|
||||
if "SL_API_KEY" not in os.environ:
|
||||
raise SkipTest("Missing envvar SL_API_KEY")
|
||||
if "SLMQ_HOST" not in os.environ:
|
||||
raise SkipTest("Missing envvar SLMQ_HOST")
|
||||
if "SLMQ_SECURE" not in os.environ:
|
||||
raise SkipTest("Missing envvar SLMQ_SECURE")
|
||||
|
||||
def after_connect(self, connection):
|
||||
pass
|
|
@ -0,0 +1,176 @@
|
|||
"""
|
||||
kombu.transport.SLMQ
|
||||
===================
|
||||
|
||||
SoftLayer Message Queue transport.
|
||||
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
|
||||
import socket
|
||||
import string
|
||||
|
||||
from Queue import Empty
|
||||
|
||||
from anyjson import loads, dumps
|
||||
|
||||
import softlayer_messaging
|
||||
import os
|
||||
|
||||
from kombu.utils import cached_property # , uuid
|
||||
from kombu.utils.encoding import safe_str
|
||||
|
||||
from . import virtual
|
||||
|
||||
# dots are replaced by dash, all other punctuation replaced by underscore.
|
||||
CHARS_REPLACE_TABLE = dict(
|
||||
(ord(c), 0x5f) for c in string.punctuation if c not in '_')
|
||||
|
||||
|
||||
class Channel(virtual.Channel):
|
||||
default_visibility_timeout = 1800 # 30 minutes.
|
||||
domain_format = 'kombu%(vhost)s'
|
||||
_slmq = None
|
||||
_queue_cache = {}
|
||||
_noack_queues = set()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Channel, self).__init__(*args, **kwargs)
|
||||
queues = self.slmq.queues()
|
||||
for queue in queues:
|
||||
self._queue_cache[queue] = queue
|
||||
|
||||
def basic_consume(self, queue, no_ack, *args, **kwargs):
|
||||
if no_ack:
|
||||
self._noack_queues.add(queue)
|
||||
return super(Channel, self).basic_consume(queue, no_ack,
|
||||
*args, **kwargs)
|
||||
|
||||
def basic_cancel(self, consumer_tag):
|
||||
if consumer_tag in self._consumers:
|
||||
queue = self._tag_to_queue[consumer_tag]
|
||||
self._noack_queues.discard(queue)
|
||||
return super(Channel, self).basic_cancel(consumer_tag)
|
||||
|
||||
def entity_name(self, name, table=CHARS_REPLACE_TABLE):
|
||||
"""Format AMQP queue name into a valid SLQS queue name."""
|
||||
return unicode(safe_str(name)).translate(table)
|
||||
|
||||
def _new_queue(self, queue, **kwargs):
|
||||
"""Ensures a queue exists in SLQS."""
|
||||
queue = self.entity_name(self.queue_name_prefix + queue)
|
||||
try:
|
||||
return self._queue_cache[queue]
|
||||
except KeyError:
|
||||
try:
|
||||
self.slmq.create_queue(
|
||||
queue, visibility_timeout=self.visibility_timeout)
|
||||
except softlayer_messaging.errors.ResponseError:
|
||||
pass
|
||||
q = self._queue_cache[queue] = self.slmq.queue(queue)
|
||||
return q
|
||||
|
||||
def _delete(self, queue, *args):
|
||||
"""delete queue by name."""
|
||||
queue_name = self.entity_name(queue)
|
||||
self._queue_cache.pop(queue_name, None)
|
||||
self.slmq.queue(queue_name).delete(force=True)
|
||||
super(Channel, self)._delete(queue_name)
|
||||
|
||||
def _put(self, queue, message, **kwargs):
|
||||
"""Put message onto queue."""
|
||||
q = self._new_queue(queue)
|
||||
q.push(dumps(message))
|
||||
|
||||
def _get(self, queue):
|
||||
"""Try to retrieve a single message off ``queue``."""
|
||||
q = self._new_queue(queue)
|
||||
rs = q.pop(1)
|
||||
if rs['items']:
|
||||
m = rs['items'][0]
|
||||
payload = loads(m['body'])
|
||||
if queue in self._noack_queues:
|
||||
q.message(m['id']).delete()
|
||||
else:
|
||||
payload['properties']['delivery_info'].update({
|
||||
'slmq_message_id': m['id'], 'slmq_queue_name': q.name})
|
||||
return payload
|
||||
raise Empty()
|
||||
|
||||
def basic_ack(self, delivery_tag):
|
||||
delivery_info = self.qos.get(delivery_tag).delivery_info
|
||||
try:
|
||||
queue = delivery_info['slmq_queue_name']
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
self.delete_message(queue, delivery_info['slmq_message_id'])
|
||||
super(Channel, self).basic_ack(delivery_tag)
|
||||
|
||||
def _size(self, queue):
|
||||
"""Returns the number of messages in a queue."""
|
||||
return self._new_queue(queue).detail()['message_count']
|
||||
|
||||
def _purge(self, queue):
|
||||
"""Deletes all current messages in a queue."""
|
||||
q = self._new_queue(queue)
|
||||
n = 0
|
||||
l = q.pop(10)
|
||||
while l['items']:
|
||||
for m in l['items']:
|
||||
self.delete_message(queue, m['id'])
|
||||
n += 1
|
||||
l = q.pop(10)
|
||||
return n
|
||||
|
||||
def delete_message(self, queue, message_id):
|
||||
q = self.slmq.queue(self.entity_name(queue))
|
||||
return q.message(message_id).delete()
|
||||
|
||||
@property
|
||||
def slmq(self):
|
||||
if self._slmq is None:
|
||||
conninfo = self.conninfo
|
||||
account = os.environ.get('SLMQ_ACCOUNT', conninfo.virtual_host)
|
||||
user = os.environ.get('SL_USERNAME', conninfo.userid)
|
||||
api_key = os.environ.get('SL_API_KEY', conninfo.password)
|
||||
host = os.environ.get('SLMQ_HOST', conninfo.hostname)
|
||||
port = os.environ.get('SLMQ_PORT', conninfo.port)
|
||||
secure = bool(os.environ.get(
|
||||
'SLMQ_SECURE', self.transport_options.get('secure')) or True)
|
||||
if secure:
|
||||
endpoint = "https://%s" % host
|
||||
else:
|
||||
endpoint = "http://%s" % host
|
||||
if port:
|
||||
endpoint = "%s:%s" % (endpoint, port)
|
||||
|
||||
self._slmq = softlayer_messaging.get_client(
|
||||
account, endpoint=endpoint)
|
||||
self._slmq.authenticate(user, api_key)
|
||||
return self._slmq
|
||||
|
||||
@property
|
||||
def conninfo(self):
|
||||
return self.connection.client
|
||||
|
||||
@property
|
||||
def transport_options(self):
|
||||
return self.connection.client.transport_options
|
||||
|
||||
@cached_property
|
||||
def visibility_timeout(self):
|
||||
return (self.transport_options.get('visibility_timeout') or
|
||||
self.default_visibility_timeout)
|
||||
|
||||
@cached_property
|
||||
def queue_name_prefix(self):
|
||||
return self.transport_options.get('queue_name_prefix', '')
|
||||
|
||||
|
||||
class Transport(virtual.Transport):
|
||||
Channel = Channel
|
||||
|
||||
polling_interval = 1
|
||||
default_port = None
|
||||
connection_errors = (softlayer_messaging.ResponseError, socket.error)
|
|
@ -59,6 +59,8 @@ TRANSPORT_ALIASES = {
|
|||
'django': 'kombu.transport.django:Transport',
|
||||
'sqlalchemy': 'kombu.transport.sqlalchemy:Transport',
|
||||
'sqla': 'kombu.transport.sqlalchemy:Transport',
|
||||
'SLMQ': 'kombu.transport.SLMQ.Transport',
|
||||
'slmq': 'kombu.transport.SLMQ.Transport',
|
||||
'ghettoq.taproot.Redis': _ghettoq('Redis', 'redis', 'redis'),
|
||||
'ghettoq.taproot.Database': _ghettoq('Database', 'django', 'django'),
|
||||
'ghettoq.taproot.MongoDB': _ghettoq('MongoDB', 'mongodb'),
|
||||
|
|
Loading…
Reference in New Issue