diff --git a/CHANGES.md b/CHANGES.md index 5366f969..93cc7bca 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,12 +1,17 @@ ### 0.3.8 (not yet released) +- `rqworker` and `rqinfo` have a `--url` argument to connect to a Redis url. + - `rqworker` and `rqinfo` have a `--socket` option to connect to a Redis server through a Unix socket. - `rqworker` reads `SENTRY_DSN` from the environment, unless specifically provided on the command line. +- `Queue` has a new API that supports paging `get_jobs(3, 7)`, which will + return at most 7 jobs, starting from the 3rd. + ### 0.3.7 (February 26th, 2013) diff --git a/rq/queue.py b/rq/queue.py index 486af1a8..0e8bdb7c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -66,25 +66,38 @@ class Queue(object): """Returns whether the current queue is empty.""" return self.count == 0 + def safe_fetch_job(self, job_id): + try: + job = Job.safe_fetch(job_id, connection=self.connection) + except NoSuchJobError: + self.remove(job_id) + return None + except UnpickleError: + return None + return job + + def get_job_ids(self, start=0, limit=-1): + """Returns a slice of job IDs in the queue.""" + if limit >= 0: + end = start + limit + else: + end = limit + return self.connection.lrange(self.key, start, end) + + def get_jobs(self, start=0, limit=-1): + """Returns a slice of jobs in the queue.""" + job_ids = self.get_job_ids(start, limit) + return compact([self.safe_fetch_job(job_id) for job_id in job_ids]) + @property def job_ids(self): """Returns a list of all job IDS in the queue.""" - return self.connection.lrange(self.key, 0, -1) + return self.get_job_ids() @property def jobs(self): """Returns a list of all (valid) jobs in the queue.""" - def safe_fetch(job_id): - try: - job = Job.safe_fetch(job_id, connection=self.connection) - except NoSuchJobError: - self.remove(job_id) - return None - except UnpickleError: - return None - return job - - return compact([safe_fetch(job_id) for job_id in self.job_ids]) + return self.get_jobs() @property def count(self): @@ -115,6 +128,7 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, after=None): """Creates a job to represent the delayed function call and enqueues diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 0e2ed977..fc31d092 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -7,7 +7,8 @@ def add_standard_arguments(parser): parser.add_argument('--config', '-c', default=None, help='Module containing RQ settings.') parser.add_argument('--url', '-u', default=None, - help='URL describing Redis connection details') + help='URL describing Redis connection details. ' + 'Overrides other connection arguments if supplied.') parser.add_argument('--host', '-H', default=None, help='The Redis hostname (default: localhost)') parser.add_argument('--port', '-p', default=None, @@ -47,13 +48,10 @@ def setup_default_arguments(args, settings): if args.password is None: args.password = settings.get('REDIS_PASSWORD', None) - if not args.queues: - args.queues = settings.get('QUEUES', ['default']) - def setup_redis(args): if args.url is not None: - redis_conn = redis.StrictRedis.from_url(args.url, db=args.db) + redis_conn = redis.StrictRedis.from_url(args.url) else: redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db, password=args.password, unix_socket_path=args.socket) diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py index 7e036474..df16eb28 100755 --- a/rq/scripts/rqinfo.py +++ b/rq/scripts/rqinfo.py @@ -21,6 +21,7 @@ def pad(s, pad_to_length): """Pads the given string to the given length.""" return ('%-' + '%ds' % pad_to_length) % (s,) + def get_scale(x): """Finds the lowest scale where x <= scale.""" scales = [20, 50, 100, 200, 400, 600, 800, 1000] @@ -29,6 +30,7 @@ def get_scale(x): return scale return x + def state_symbol(state): symbols = { 'busy': red('busy'), @@ -186,4 +188,3 @@ def main(): except KeyboardInterrupt: print sys.exit(0) - diff --git a/rq/scripts/rqworker.py b/rq/scripts/rqworker.py index 19073eab..29505cae 100755 --- a/rq/scripts/rqworker.py +++ b/rq/scripts/rqworker.py @@ -32,6 +32,19 @@ def parse_args(): return parser.parse_args() +def setup_loghandlers_from_args(args): + if args.verbose and args.quiet: + raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.") + + if args.verbose: + level = 'DEBUG' + elif args.quiet: + level = 'WARNING' + else: + level = 'INFO' + setup_loghandlers(level) + + def main(): args = parse_args() @@ -44,21 +57,15 @@ def main(): setup_default_arguments(args, settings) - # Other default arguments + # Worker specific default arguments + if not args.queues: + args.queues = settings.get('QUEUES', ['default']) + if args.sentry_dsn is None: args.sentry_dsn = settings.get('SENTRY_DSN', os.environ.get('SENTRY_DSN', None)) - if args.verbose and args.quiet: - raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.") - - if args.verbose: - level = 'DEBUG' - elif args.quiet: - level = 'WARNING' - else: - level = 'INFO' - setup_loghandlers(level) + setup_loghandlers_from_args(args) setup_redis(args) cleanup_ghosts() diff --git a/tests/fixtures.py b/tests/fixtures.py index 77774d90..337fc70b 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -49,13 +49,16 @@ def access_self(): return job.id -class Calculator(object): - """Test instance methods.""" - def __init__(self, denominator): - self.denominator = denominator +class Number(object): + def __init__(self, value): + self.value = value - def calculate(self, x, y): - return x * y / self.denominator + @classmethod + def divide(cls, x, y): + return x * y + + def div(self, y): + return self.value / y with Connection(): diff --git a/tests/test_job.py b/tests/test_job.py index 83a6eae6..e15bf652 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,7 +1,7 @@ import times from datetime import datetime from tests import RQTestCase -from tests.fixtures import Calculator, some_calculation, say_hello, access_self +from tests.fixtures import Number, some_calculation, say_hello, access_self from tests.helpers import strip_milliseconds from cPickle import loads from rq.job import Job, get_current_job @@ -51,13 +51,13 @@ class TestJob(RQTestCase): def test_create_instance_method_job(self): """Creation of jobs for instance methods.""" - c = Calculator(2) - job = Job.create(func=c.calculate, args=(3, 4)) + n = Number(2) + job = Job.create(func=n.div, args=(4,)) # Job data is set - self.assertEquals(job.func, c.calculate) - self.assertEquals(job.instance, c) - self.assertEquals(job.args, (3, 4)) + self.assertEquals(job.func, n.div) + self.assertEquals(job.instance, n) + self.assertEquals(job.args, (4,)) def test_create_job_from_string_function(self): """Creation of jobs using string specifier.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 83a129e3..d1646e0e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,5 +1,5 @@ from tests import RQTestCase -from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation +from tests.fixtures import Number, div_by_zero, say_hello, some_calculation from rq import Queue, get_failed_queue from rq.job import Job, Status from rq.exceptions import InvalidJobOperationError @@ -161,14 +161,26 @@ class TestQueue(RQTestCase): def test_dequeue_instance_method(self): """Dequeueing instance method jobs from queues.""" q = Queue() - c = Calculator(2) - result = q.enqueue(c.calculate, 3, 4) + n = Number(2) + q.enqueue(n.div, 4) job = q.dequeue() + # The instance has been pickled and unpickled, so it is now a separate # object. Test for equality using each object's __dict__ instead. - self.assertEquals(job.instance.__dict__, c.__dict__) - self.assertEquals(job.func.__name__, 'calculate') + self.assertEquals(job.instance.__dict__, n.__dict__) + self.assertEquals(job.func.__name__, 'div') + self.assertEquals(job.args, (4,)) + + def test_dequeue_class_method(self): + """Dequeueing class method jobs from queues.""" + q = Queue() + q.enqueue(Number.divide, 3, 4) + + job = q.dequeue() + + self.assertEquals(job.instance.__dict__, Number.__dict__) + self.assertEquals(job.func.__name__, 'divide') self.assertEquals(job.args, (3, 4)) def test_dequeue_ignores_nonexisting_jobs(self):