From d8a9e99f89c047a90f7de2538c73505a641d4b27 Mon Sep 17 00:00:00 2001 From: Tommie McAfee Date: Thu, 25 Apr 2013 17:17:30 -0400 Subject: [PATCH] Add Pyro4 transport Use a Pyro proxy as channel. Published object must implement methods used in the Channel class for the shared_queues object. Once published this object can be shared across multiple hosts. Sample object backing: https://github.com/tahmmee/pyroqueue/blob/master/pyroqueues.py --- kombu/transport/pyro.py | 96 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 kombu/transport/pyro.py diff --git a/kombu/transport/pyro.py b/kombu/transport/pyro.py new file mode 100644 index 00000000..f4891779 --- /dev/null +++ b/kombu/transport/pyro.py @@ -0,0 +1,96 @@ +""" +kombu.transport.pyro +====================== + +Pyro transport. + +""" +from __future__ import absolute_import + +import Pyro4 +from Pyro4.errors import NamingError +from Queue import Queue + +from . import virtual + +DEFAULT_PORT = 9090 + +class Channel(virtual.Channel): + + + + def __init__(self, *args, **kwargs): + super(Channel, self).__init__(*args, **kwargs) + transport = args[0] + self.shared_queues = transport.shared_queues + + def queues(self): + return self.shared_queues.get_queue_names() + + def _new_queue(self, queue, **kwargs): + if queue not in self.queues(): + self.shared_queues.new_queue(queue) + + def _get(self, queue, timeout=None): + queue = self._queue_for(queue) + msg = self.shared_queues._get(queue) + return msg + + def _queue_for(self, queue): + if queue not in self.queues(): + self.shared_queues.new_queue(queue) + return queue + + def _put(self, queue, message, **kwargs): + queue = self._queue_for(queue) + self.shared_queues._put(queue, message) + + def _size(self, queue): + return self.shared_queues._size(queue) + + def _delete(self, queue, *args): + self.shared_queues._delete(queue) + + def _purge(self, queue): + return self.shared_queues._purge(queue) + + def after_reply_message_received(self, queue): + pass + + +class Transport(virtual.Transport): + Channel = Channel + + #: memory backend state is global. + state = virtual.BrokerState() + + default_port = DEFAULT_PORT + + driver_type = 'pyro' + driver_name = 'pyro' + + def __init__(self, client, **kwargs): + super(Transport, self).__init__(client) + self.client = client + self.default_port = kwargs.get("default_port") or self.default_port + self.shared_queues = None + + conninfo = self.client + for name, default_value in self.default_connection_params.items(): + if not getattr(conninfo, name, None): + setattr(conninfo, name, default_value) + + if conninfo.hostname == 'localhost': + conninfo.hostname = '127.0.0.1' + + Pyro4.config.HMAC_KEY=conninfo.virtual_host + try: + nameserver = Pyro4.locateNS(host=conninfo.hostname, port=self.default_port) + uri = nameserver.lookup(conninfo.virtual_host) # name of registered pyro object + self.shared_queues = Pyro4.Proxy(uri) + except NamingError as ex: + err = "Unable to locate pyro nameserver (%s) on host %s" % (conninfo.virtual_host, conninfo.hostname) + raise NamingError(err) + + def driver_version(self): + return 'N/A'