From c9ba66bd5956b2a219708d22e872a72ef0a0092a Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 18 Nov 2011 17:32:43 +0100 Subject: [PATCH] Register workers in a central set ("rq:workers"). --- rq/worker.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 188b3aa2..bb34bafd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -19,13 +19,14 @@ def iterable(x): class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' + redis_workers_keys = 'rq:workers' @classmethod def all(cls): """Returns an iterable of all Workers. """ - prefix = cls.redis_worker_namespace_prefix - return map(cls.from_worker_key, conn.keys('%s*' % prefix)) + reported_working = conn.smembers(cls.redis_workers_keys) + return map(cls.from_worker_key, reported_working) @classmethod def from_worker_key(cls, worker_key): @@ -105,9 +106,14 @@ class Worker(object): """Registers its own birth.""" if conn.exists(self.key) and not conn.hexists(self.key, 'death'): raise ValueError('There exists an active worker named \'%s\' alread.' % (self.name,)) + key = self.key + now = time.time() + queues = ','.join(self.queue_names()) with conn.pipeline() as p: - p.delete(self.key) - p.hset(self.key, 'birth', time.time()) + p.delete(key) + p.hset(key, 'birth', now) + p.hset(key, 'queues', queues) + p.sadd(self.redis_workers_keys, key) p.execute() def register_death(self): @@ -116,7 +122,7 @@ class Worker(object): with conn.pipeline() as p: # We cannot use self.state = 'dead' here, because that would # rollback the pipeline - p.hset(self.key, 'state', 'dead') + p.srem(self.redis_workers_keys, self.key) p.hset(self.key, 'death', time.time()) p.expire(self.key, 60) p.execute()