diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py index 1f155a0a..a450ee53 100644 --- a/kombu/transport/pyredis.py +++ b/kombu/transport/pyredis.py @@ -8,7 +8,8 @@ Redis transport. :license: BSD, see LICENSE for more details. """ -from Queue import Empty +from threading import Condition, Event, Lock, Thread +from Queue import Empty, Queue as _Queue from anyjson import serialize, deserialize from redis import Redis @@ -20,6 +21,86 @@ DEFAULT_PORT = 6379 DEFAULT_DB = 0 +class ChannelPoller(Thread): + + #: The method used to poll for new messages. + drain_events = None + + #: Queue to put inbound messages onto. + inbound = None + + #: Condition primitive used to notify poll requests. + poll_request = None + + #: Event set when the thread should shut down. + shutdown = None + + def __init__(self, drain_events): + self.drain_events = drain_events + self.inbound = _Queue() + self.mutex = Lock() + self.poll_request = Condition(self.mutex) + self.shutdown = Event() + Thread.__init__(self) + self.setDaemon(False) + + def run(self): + inbound = self.inbound + drain_events = self.drain_events + poll_request = self.poll_request + + while 1: + if self.shutdown.isSet(): + break + + try: + item = drain_events(timeout=1) + except Empty: + pass + else: + self.inbound.put_nowait(item) + + if self.shutdown.isSet(): + break + + # Wait for next poll request + # + # Timeout needs to be short here, otherwise it will block + # shutdown. This means that polling will continue even when + # there are no actual calls to `poll`. However, this doesn't + # cause any problems for our current use (especially with + # the active QoS manager). + poll_request.acquire() + try: + poll_request.wait(1) + finally: + poll_request.release() + + def poll(self): + # start thread on demand. + self.ensure_started() + + # notify the thread that a poll request has been initiated. + self.poll_request.acquire() + try: + self.poll_request.notify() + finally: + self.poll_request.release() + + # the thread should start to poll now, so just wait + # for it to put a message onto the inbound queue. + return self.inbound.get(timeout=0.3) + + def ensure_started(self): + if not self.isAlive(): + self.start() + + def close(self): + if self.isAlive(): + self.shutdown.set() + self.join() + + class Channel(virtual.Channel): _client = None supports_fanout = True @@ -27,6 +108,15 @@ class Channel(virtual.Channel): keyprefix_queue = "_kombu.binding.%s" sep = '\x06\x16' + def __init__(self, *args, **kwargs): + super_ = super(Channel, self) + super_.__init__(*args, **kwargs) + + self._poller = ChannelPoller(super_.drain_events) + + def drain_events(self, timeout=None): + return self._poller.poll() + def _queue_bind(self, exchange, routing_key, pattern, queue): self.client.sadd(self.keyprefix_queue % (exchange, ), self.sep.join([routing_key or "", @@ -65,6 +155,7 @@ class Channel(virtual.Channel): return size def close(self): + self._poller.close() super(Channel, self).close() try: self.client.bgsave()