mirror of https://github.com/celery/kombu.git
redis: brpop doesn't have subsecond timeouts, so use one thread per channel to poll with effectively implementing subsecond polling times per channel. Currently the maximum delay for a message to be received on a channel is 0.3s (compared to 1s before)
This commit is contained in:
parent
4cfe6798cb
commit
3f919656b7
|
@ -8,7 +8,8 @@ Redis transport.
|
|||
:license: BSD, see LICENSE for more details.
|
||||
|
||||
"""
|
||||
from Queue import Empty
|
||||
from threading import Condition, Event, Lock, Thread
|
||||
from Queue import Empty, Queue as _Queue
|
||||
|
||||
from anyjson import serialize, deserialize
|
||||
from redis import Redis
|
||||
|
@ -20,6 +21,86 @@ DEFAULT_PORT = 6379
|
|||
DEFAULT_DB = 0
|
||||
|
||||
|
||||
class ChannelPoller(Thread):
|
||||
|
||||
#: The method used to poll for new messages.
|
||||
drain_events = None
|
||||
|
||||
#: Queue to put inbound messages onto.
|
||||
inbound = None
|
||||
|
||||
#: Condition primitive used to notify poll requests.
|
||||
poll_request = None
|
||||
|
||||
#: Event set when the thread should shut down.
|
||||
shutdown = None
|
||||
|
||||
def __init__(self, drain_events):
|
||||
self.drain_events = drain_events
|
||||
self.inbound = _Queue()
|
||||
self.mutex = Lock()
|
||||
self.poll_request = Condition(self.mutex)
|
||||
self.shutdown = Event()
|
||||
Thread.__init__(self)
|
||||
self.setDaemon(False)
|
||||
|
||||
def run(self):
|
||||
inbound = self.inbound
|
||||
drain_events = self.drain_events
|
||||
poll_request = self.poll_request
|
||||
|
||||
while 1:
|
||||
if self.shutdown.isSet():
|
||||
break
|
||||
|
||||
try:
|
||||
item = drain_events(timeout=1)
|
||||
except Empty:
|
||||
pass
|
||||
else:
|
||||
self.inbound.put_nowait(item)
|
||||
|
||||
if self.shutdown.isSet():
|
||||
break
|
||||
|
||||
# Wait for next poll request
|
||||
#
|
||||
# Timeout needs to be short here, otherwise it will block
|
||||
# shutdown. This means that polling will continue even when
|
||||
# there are no actual calls to `poll`. However, this doesn't
|
||||
# cause any problems for our current use (especially with
|
||||
# the active QoS manager).
|
||||
poll_request.acquire()
|
||||
try:
|
||||
poll_request.wait(1)
|
||||
finally:
|
||||
poll_request.release()
|
||||
|
||||
def poll(self):
|
||||
# start thread on demand.
|
||||
self.ensure_started()
|
||||
|
||||
# notify the thread that a poll request has been initiated.
|
||||
self.poll_request.acquire()
|
||||
try:
|
||||
self.poll_request.notify()
|
||||
finally:
|
||||
self.poll_request.release()
|
||||
|
||||
# the thread should start to poll now, so just wait
|
||||
# for it to put a message onto the inbound queue.
|
||||
return self.inbound.get(timeout=0.3)
|
||||
|
||||
def ensure_started(self):
|
||||
if not self.isAlive():
|
||||
self.start()
|
||||
|
||||
def close(self):
|
||||
if self.isAlive():
|
||||
self.shutdown.set()
|
||||
self.join()
|
||||
|
||||
|
||||
class Channel(virtual.Channel):
|
||||
_client = None
|
||||
supports_fanout = True
|
||||
|
@ -27,6 +108,15 @@ class Channel(virtual.Channel):
|
|||
keyprefix_queue = "_kombu.binding.%s"
|
||||
sep = '\x06\x16'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super_ = super(Channel, self)
|
||||
super_.__init__(*args, **kwargs)
|
||||
|
||||
self._poller = ChannelPoller(super_.drain_events)
|
||||
|
||||
def drain_events(self, timeout=None):
|
||||
return self._poller.poll()
|
||||
|
||||
def _queue_bind(self, exchange, routing_key, pattern, queue):
|
||||
self.client.sadd(self.keyprefix_queue % (exchange, ),
|
||||
self.sep.join([routing_key or "",
|
||||
|
@ -65,6 +155,7 @@ class Channel(virtual.Channel):
|
|||
return size
|
||||
|
||||
def close(self):
|
||||
self._poller.close()
|
||||
super(Channel, self).close()
|
||||
try:
|
||||
self.client.bgsave()
|
||||
|
|
Loading…
Reference in New Issue