Resolve connections early.

Fixes #101.
This commit is contained in:
Vincent Driessen 2012-07-24 12:33:22 +02:00
parent c7225ba257
commit d697ddb93a
4 changed files with 29 additions and 16 deletions

View File

@ -51,6 +51,20 @@ def get_current_connection():
return _connection_stack.top
def resolve_connection(connection=None):
"""Convenience function to resolve the given or the current connection.
Raises an exception if it cannot resolve a connection now.
"""
if connection is not None:
return connection
connection = get_current_connection()
if connection is None:
raise NoRedisConnectionException(
'Could not resolve a Redis connection.')
return connection
_connection_stack = LocalStack()
__all__ = ['Connection',

View File

@ -1,14 +1,15 @@
from functools import wraps
from .queue import Queue
from .connections import resolve_connection
class job(object):
def __init__(self, queue, connection=None, timeout=None):
"""A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a required ``queue``
argument that can be either a ``Queue`` instance or a string denoting
the queue name. For example:
which in turn creates a RQ job when called. Accepts a required
``queue`` argument that can be either a ``Queue`` instance or a string
denoting the queue name. For example:
@job(queue='default')
def simple_add(x, y):
@ -17,7 +18,7 @@ class job(object):
simple_add.delay(1, 2) # Puts simple_add function into queue
"""
self.queue = queue
self.connection = connection
self.connection = resolve_connection(connection)
self.timeout = timeout
def __call__(self, f):

View File

@ -1,5 +1,5 @@
import times
from .connections import get_current_connection
from .connections import resolve_connection
from .job import Job
from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError
from .compat import total_ordering
@ -23,8 +23,7 @@ class Queue(object):
"""Returns an iterable of all Queues.
"""
prefix = cls.redis_queue_namespace_prefix
if connection is None:
connection = get_current_connection()
connection = resolve_connection(connection)
def to_queue(queue_key):
return cls.from_queue_key(queue_key, connection=connection)
@ -43,9 +42,7 @@ class Queue(object):
return cls(name, connection=connection)
def __init__(self, name='default', default_timeout=None, connection=None):
if connection is None:
connection = get_current_connection()
self.connection = connection
self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix
self.name = name
self._key = '%s%s' % (prefix, name)
@ -187,8 +184,7 @@ class Queue(object):
Until Redis receives a specific method for this, we'll have to wrap it
this way.
"""
if connection is None:
connection = get_current_connection()
connection = resolve_connection(connection)
if blocking:
queue_key, job_id = connection.blpop(queue_keys)
return queue_key, job_id

View File

@ -3,7 +3,7 @@ This file contains all jobs that are used in tests. Each of these test
fixtures has a slighty different characteristics.
"""
import time
from rq import Connection
from rq.decorators import job
@ -51,6 +51,8 @@ class Calculator(object):
def calculate(self, x, y):
return x * y / self.denominator
@job(queue='default')
def decorated_job(x, y):
return x + y
with Connection():
@job(queue='default')
def decorated_job(x, y):
return x + y