diff --git a/rq/__init__.py b/rq/__init__.py index c1258910..5c67d363 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,6 +1,7 @@ import uuid from pickle import loads, dumps -from rdb import conn +from .conn import current_connection, push_connection, pop_connection +from .queue import Queue def to_queue_key(queue_name): return 'rq:%s' % (queue_name,) @@ -13,7 +14,7 @@ class DelayedResult(object): @property def return_value(self): if self._rv is None: - rv = conn.get(self.key) + rv = current_connection().get(self.key) if rv is not None: # cache the result self._rv = loads(rv) @@ -31,10 +32,8 @@ class job(object): if f.__module__ == '__main__': raise ValueError('Functions from the __main__ module cannot be processed by workers.') s = dumps((f, key, args, kwargs)) - conn.rpush(queue_key, s) + current_connection().rpush(queue_key, s) return DelayedResult(key) f.delay = delay return f - - diff --git a/rq/conn.py b/rq/conn.py new file mode 100644 index 00000000..6ab9f518 --- /dev/null +++ b/rq/conn.py @@ -0,0 +1,18 @@ +from werkzeug.local import LocalStack + +class NoRedisConnectionException(Exception): + pass + +_conn = LocalStack() + +def push_connection(redis_conn): + _conn.push(redis_conn) + +def pop_connection(): + return _conn.pop() + +def current_connection(): + conn = _conn.top + if conn is None: + raise NoRedisConnectionException('Connect to Redis first.') + return conn diff --git a/rq/queue.py b/rq/queue.py index f5e015bc..a35ae3e1 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,3 +1,5 @@ +from . import current_connection + def to_queue_key(queue_name): return 'rq:%s' % (queue_name,) @@ -13,5 +15,9 @@ class Queue(object): def key(self): return self._key + @property + def empty(self): + return current_connection().llen(self.key) == 0 + def __str__(self): return self.name diff --git a/rq/worker.py b/rq/worker.py index 76636d17..99198ae5 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -5,8 +5,8 @@ import time import procname from logbook import Logger from pickle import loads, dumps -from rdb import conn from .queue import Queue +from .conn import current_connection class NoQueueError(Exception): pass @@ -45,7 +45,7 @@ class Worker(object): def work(self): while True: self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) - queue, msg = conn.blpop(self.queue_keys()) + queue, msg = current_connection().blpop(self.queue_keys()) self.fork_and_perform_job(queue, msg) def fork_and_perform_job(self, queue, msg): @@ -80,7 +80,7 @@ class Worker(object): else: self.log.info('Job ended normally without result') if rv is not None: - p = conn.pipeline() - conn.set(key, dumps(rv)) - conn.expire(key, self.rv_ttl) + p = current_connection().pipeline() + p.set(key, dumps(rv)) + p.expire(key, self.rv_ttl) p.execute()