Register workers in a central set ("rq:workers").

This commit is contained in:
Vincent Driessen 2011-11-18 17:32:43 +01:00
parent 1f12678468
commit c9ba66bd59
1 changed files with 11 additions and 5 deletions

View File

@ -19,13 +19,14 @@ def iterable(x):
class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = 'rq:workers'
@classmethod
def all(cls):
"""Returns an iterable of all Workers.
"""
prefix = cls.redis_worker_namespace_prefix
return map(cls.from_worker_key, conn.keys('%s*' % prefix))
reported_working = conn.smembers(cls.redis_workers_keys)
return map(cls.from_worker_key, reported_working)
@classmethod
def from_worker_key(cls, worker_key):
@ -105,9 +106,14 @@ class Worker(object):
"""Registers its own birth."""
if conn.exists(self.key) and not conn.hexists(self.key, 'death'):
raise ValueError('There exists an active worker named \'%s\' alread.' % (self.name,))
key = self.key
now = time.time()
queues = ','.join(self.queue_names())
with conn.pipeline() as p:
p.delete(self.key)
p.hset(self.key, 'birth', time.time())
p.delete(key)
p.hset(key, 'birth', now)
p.hset(key, 'queues', queues)
p.sadd(self.redis_workers_keys, key)
p.execute()
def register_death(self):
@ -116,7 +122,7 @@ class Worker(object):
with conn.pipeline() as p:
# We cannot use self.state = 'dead' here, because that would
# rollback the pipeline
p.hset(self.key, 'state', 'dead')
p.srem(self.redis_workers_keys, self.key)
p.hset(self.key, 'death', time.time())
p.expire(self.key, 60)
p.execute()