From e61d1505bcb8765ab1cbecc17ea243d44cb587db Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 29 Jul 2014 18:26:24 +0700 Subject: [PATCH 1/7] Added WorkingQueue class. --- rq/utils.py | 6 +++++ rq/working_queue.py | 38 ++++++++++++++++++++++++++++++++ tests/test_working_queue.py | 44 +++++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+) create mode 100644 rq/working_queue.py create mode 100644 tests/test_working_queue.py diff --git a/rq/utils.py b/rq/utils.py index d85235ef..885e5b0a 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -8,6 +8,7 @@ terminal colorizing code, originally by Georg Brandl. from __future__ import (absolute_import, division, print_function, unicode_literals) +import calendar import importlib import datetime import logging @@ -229,3 +230,8 @@ def first(iterable, default=None, key=None): return el return default + + +def current_timestamp(): + """Returns current UTC timestamp""" + return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) diff --git a/rq/working_queue.py b/rq/working_queue.py new file mode 100644 index 00000000..3e5da458 --- /dev/null +++ b/rq/working_queue.py @@ -0,0 +1,38 @@ +from .connections import resolve_connection +from .queue import FailedQueue +from .utils import current_timestamp + + +class WorkingQueue: + """ + Registry of currently executing jobs. Each queue maintains a WorkingQueue. + WorkingQueue contains job keys that are currently being executed. + Each key is scored by job's expiration time (datetime started + timeout). + + Jobs are added to registry right before they are executed and removed + right after completion (success or failure). + + Jobs whose score are lower than current time is considered "expired". + """ + + def __init__(self, name, connection=None): + self.name = name + self.key = 'rq:wip:%s' % name + self.connection = resolve_connection(connection) + + def add(self, job, timeout): + """Adds a job to WorkingQueue with expiry time of now + timeout.""" + return self.connection._zadd(self.key, current_timestamp() + timeout, + job.id) + + def remove(self, job): + return self.connection.zrem(self.key, job.id) + + def get_expired_job_ids(self): + """Returns job ids whose score are less than current timestamp.""" + return self.connection.zrangebyscore(self.key, 0, current_timestamp()) + + def get_job_ids(self, start=0, end=-1): + """Returns list of all job ids.""" + return self.connection.zrange(self.key, start, end) + diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py new file mode 100644 index 00000000..9c1a1e5f --- /dev/null +++ b/tests/test_working_queue.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from rq.job import Job +from rq.utils import current_timestamp +from rq.working_queue import WorkingQueue + +from tests import RQTestCase + + +class TestQueue(RQTestCase): + + def setUp(self): + super(TestQueue, self).setUp() + self.working_queue = WorkingQueue('default', connection=self.testconn) + + def test_add_and_remove(self): + """Adding and removing job to WorkingQueue.""" + timestamp = current_timestamp() + job = Job() + + # Test that job is added with the right score + self.working_queue.add(job, 1000) + self.assertLess(self.testconn.zscore(self.working_queue.key, job.id), + timestamp + 1001) + + # Ensure that job is properly removed from sorted set + self.working_queue.remove(job) + self.assertIsNone(self.testconn.zscore(self.working_queue.key, job.id)) + + def test_get_job_ids(self): + """Getting job ids from WorkingQueue.""" + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.testconn.zadd(self.working_queue.key, 10, 'bar') + self.assertEqual(self.working_queue.get_job_ids(), ['foo', 'bar']) + + def test_get_expired_job_ids(self): + """Getting expired job ids form WorkingQueue.""" + timestamp = current_timestamp() + + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') + + self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) \ No newline at end of file From 90c7eeb1115b10d5620a7fa3fcd0d7e0332a471d Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 29 Jul 2014 18:37:22 +0700 Subject: [PATCH 2/7] Implemented WorkingQueue.cleanup(). --- rq/queue.py | 5 +++-- rq/working_queue.py | 12 ++++++++++++ tests/test_working_queue.py | 13 ++++++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 22aa1608..a74e6863 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -158,9 +158,10 @@ class Queue(object): if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id): + def push_job_id(self, job_id, pipeline=None): """Pushes a job ID on the corresponding Redis queue.""" - self.connection.rpush(self.key, job_id) + connection = pipeline if pipeline is not None else self.connection + connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None): diff --git a/rq/working_queue.py b/rq/working_queue.py index 3e5da458..64cf1e90 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -36,3 +36,15 @@ class WorkingQueue: """Returns list of all job ids.""" return self.connection.zrange(self.key, start, end) + def cleanup(self): + """Removes expired job ids to FailedQueue.""" + job_ids = self.get_expired_job_ids() + + if job_ids: + failed_queue = FailedQueue(connection=self.connection) + with self.connection.pipeline() as pipeline: + for job_id in job_ids: + failed_queue.push_job_id(job_id, pipeline=pipeline) + pipeline.execute() + + return job_ids diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index 9c1a1e5f..af6eeb49 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from rq.job import Job +from rq.queue import FailedQueue from rq.utils import current_timestamp from rq.working_queue import WorkingQueue @@ -41,4 +42,14 @@ class TestQueue(RQTestCase): self.testconn.zadd(self.working_queue.key, 1, 'foo') self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') - self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) \ No newline at end of file + self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) + + def test_cleanup(self): + """Moving expired jobs to FailedQueue.""" + failed_queue = FailedQueue(connection=self.testconn) + self.assertTrue(failed_queue.is_empty()) + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.working_queue.cleanup() + self.assertIn('foo', failed_queue.job_ids) + + \ No newline at end of file From f38d0dc79141432bf417a07d8de23f5092d3cacc Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 30 Jul 2014 13:36:35 +0800 Subject: [PATCH 3/7] Moved some logic into worker.prepare_job_execution to make things testable. --- rq/worker.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 59c1624c..2731c98e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,6 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION +from .working_queue import WorkingQueue try: from procname import setprocname @@ -403,7 +404,7 @@ class Worker(object): self.heartbeat() return result - def heartbeat(self, timeout=0): + def heartbeat(self, timeout=0, pipeline=None): """Specifies a new worker timeout, typically by extending the expiration time of the worker, effectively making this a "heartbeat" to not expire the worker until the timeout passes. @@ -415,7 +416,8 @@ class Worker(object): only larger. """ timeout = max(timeout, self.default_worker_ttl) - self.connection.expire(self.key, timeout) + connection = pipeline if pipeline is not None else self.connection + connection.expire(self.key, timeout) self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within {0} seconds.'.format(timeout)) @@ -468,19 +470,26 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) - def perform_job(self, job): - """Performs the actual work of a job. Will/should only be called - inside the work horse's process. + def prepare_job_execution(self, job): + """Performs misc bookkeeping like updating states prior to + job execution. """ - - self.set_state('busy') - self.set_current_job_id(job.id) - self.heartbeat((job.timeout or 180) + 60) + with self.connection._pipeline() as pipeline: + self.set_state('busy', pipeline=pipeline) + self.set_current_job_id(job.id, pipeline=pipeline) + self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) + pipeline.execute() self.procline('Processing %s from %s since %s' % ( job.func_name, job.origin, time.time())) + def perform_job(self, job): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ + self.prepare_job_execution(job) + with self.connection._pipeline() as pipeline: try: job.set_status(Status.STARTED) From 893fc5a6ae2171f4cd574c0e6ed4483c151ed61a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 30 Jul 2014 16:06:39 +0800 Subject: [PATCH 4/7] Add job to WorkingQueue before execution and remove from WorkingQueue after. --- rq/worker.py | 13 ++++++++++--- rq/working_queue.py | 16 ++++++++++------ tests/test_worker.py | 16 ++++++++++++++++ tests/test_working_queue.py | 29 ++++++++++++++++++++++++++--- 4 files changed, 62 insertions(+), 12 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 2731c98e..41f17d14 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -475,9 +475,12 @@ class Worker(object): job execution. """ with self.connection._pipeline() as pipeline: + timeout = (job.timeout or 180) + 60 self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) - self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) + self.heartbeat(timeout, pipeline=pipeline) + working_queue = WorkingQueue(job.origin, self.connection) + working_queue.add(job, timeout, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -491,13 +494,15 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: + working_queue = WorkingQueue(job.origin, self.connection) + try: job.set_status(Status.STARTED) with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() - # Pickle the result in the same try-except block since we need to - # use the same exc handling when pickling fails + # Pickle the result in the same try-except block since we need + # to use the same exc handling when pickling fails job._result = rv self.set_current_job_id(None, pipeline=pipeline) @@ -508,12 +513,14 @@ class Worker(object): job._status = Status.FINISHED job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) + working_queue.remove(job, pipeline=pipeline) pipeline.execute() except Exception: # Use the public setter here, to immediately update Redis job.set_status(Status.FAILED) + working_queue.remove(job) self.handle_exception(job, *sys.exc_info()) return False diff --git a/rq/working_queue.py b/rq/working_queue.py index 64cf1e90..8959384f 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -15,18 +15,22 @@ class WorkingQueue: Jobs whose score are lower than current time is considered "expired". """ - def __init__(self, name, connection=None): + def __init__(self, name='default', connection=None): self.name = name self.key = 'rq:wip:%s' % name self.connection = resolve_connection(connection) - def add(self, job, timeout): + def add(self, job, timeout, pipeline=None): """Adds a job to WorkingQueue with expiry time of now + timeout.""" - return self.connection._zadd(self.key, current_timestamp() + timeout, - job.id) + score = current_timestamp() + timeout + if pipeline is not None: + return pipeline.zadd(self.key, score, job.id) - def remove(self, job): - return self.connection.zrem(self.key, job.id) + return self.connection._zadd(self.key, score, job.id) + + def remove(self, job, pipeline=None): + connection = pipeline if pipeline is not None else self.connection + return connection.zrem(self.key, job.id) def get_expired_job_ids(self): """Returns job ids whose score are less than current timestamp.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index e02ee2ac..91f95ea3 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -7,6 +7,7 @@ import os from rq import get_failed_queue, Queue, Worker from rq.compat import as_text from rq.job import Job, Status +from rq.working_queue import WorkingQueue from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, @@ -277,3 +278,18 @@ class TestWorker(RQTestCase): q = Queue() worker = Worker([q], job_class=CustomJob) self.assertEqual(worker.job_class, CustomJob) + + def test_prepare_job_execution(self): + """Prepare job execution does the necessary bookkeeping.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + worker = Worker([queue]) + worker.prepare_job_execution(job) + + # Updates working queue + working_queue = WorkingQueue(connection=self.testconn) + self.assertEqual(working_queue.get_job_ids(), [job.id]) + + # Updates worker statuses + self.assertEqual(worker.state, 'busy') + self.assertEqual(worker.get_current_job_id(), job.id) diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index af6eeb49..249c0358 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -2,18 +2,20 @@ from __future__ import absolute_import from rq.job import Job -from rq.queue import FailedQueue +from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp +from rq.worker import Worker from rq.working_queue import WorkingQueue from tests import RQTestCase +from tests.fixtures import div_by_zero, say_hello class TestQueue(RQTestCase): def setUp(self): super(TestQueue, self).setUp() - self.working_queue = WorkingQueue('default', connection=self.testconn) + self.working_queue = WorkingQueue(connection=self.testconn) def test_add_and_remove(self): """Adding and removing job to WorkingQueue.""" @@ -52,4 +54,25 @@ class TestQueue(RQTestCase): self.working_queue.cleanup() self.assertIn('foo', failed_queue.job_ids) - \ No newline at end of file + def test_job_execution(self): + """Job is removed from WorkingQueue after execution.""" + working_queue = WorkingQueue(connection=self.testconn) + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + + job = queue.enqueue(say_hello) + + worker.prepare_job_execution(job) + self.assertIn(job.id, working_queue.get_job_ids()) + + worker.perform_job(job) + self.assertNotIn(job.id, working_queue.get_job_ids()) + + # Job that fails + job = queue.enqueue(div_by_zero) + + worker.prepare_job_execution(job) + self.assertIn(job.id, working_queue.get_job_ids()) + + worker.perform_job(job) + self.assertNotIn(job.id, working_queue.get_job_ids()) From d667fb0713c9e20256c565f2d601967b9600aedb Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 7 Sep 2014 17:03:17 +0700 Subject: [PATCH 5/7] working_queue.remove call should be pipelined. --- rq/job.py | 5 +++-- rq/worker.py | 12 +++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/rq/job.py b/rq/job.py index ef7a2662..bd91f137 100644 --- a/rq/job.py +++ b/rq/job.py @@ -147,9 +147,10 @@ class Job(object): ) return self.get_status() - def set_status(self, status): + def set_status(self, status, pipeline=None): self._status = status - self.connection.hset(self.key, 'status', self._status) + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'status', self._status) def _set_status(self, status): warnings.warn( diff --git a/rq/worker.py b/rq/worker.py index 41f17d14..21ddac67 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -474,13 +474,15 @@ class Worker(object): """Performs misc bookkeeping like updating states prior to job execution. """ + timeout = (job.timeout or 180) + 60 + with self.connection._pipeline() as pipeline: - timeout = (job.timeout or 180) + 60 self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) working_queue = WorkingQueue(job.origin, self.connection) working_queue.add(job, timeout, pipeline=pipeline) + job.set_status(Status.STARTED, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -497,7 +499,6 @@ class Worker(object): working_queue = WorkingQueue(job.origin, self.connection) try: - job.set_status(Status.STARTED) with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() @@ -518,9 +519,10 @@ class Worker(object): pipeline.execute() except Exception: - # Use the public setter here, to immediately update Redis - job.set_status(Status.FAILED) - working_queue.remove(job) + job.set_status(Status.FAILED, pipeline=pipeline) + working_queue.remove(job, pipeline=pipeline) + pipeline.execute() + self.handle_exception(job, *sys.exc_info()) return False From 1158a0606cdfaa4a3e9829decb8f3d1d94b17ea8 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 8 Sep 2014 22:35:18 +0700 Subject: [PATCH 6/7] Fixed Python 3 tests for "WorkingQueue". --- rq/working_queue.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rq/working_queue.py b/rq/working_queue.py index 8959384f..6dc6826f 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -1,3 +1,4 @@ +from .compat import as_text from .connections import resolve_connection from .queue import FailedQueue from .utils import current_timestamp @@ -34,11 +35,13 @@ class WorkingQueue: def get_expired_job_ids(self): """Returns job ids whose score are less than current timestamp.""" - return self.connection.zrangebyscore(self.key, 0, current_timestamp()) + return [as_text(job_id) for job_id in + self.connection.zrangebyscore(self.key, 0, current_timestamp())] def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" - return self.connection.zrange(self.key, start, end) + return [as_text(job_id) for job_id in + self.connection.zrange(self.key, start, end)] def cleanup(self): """Removes expired job ids to FailedQueue.""" From 1047db0b3af844f1cc5cf62412da7b64fe712aed Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 8 Sep 2014 22:39:48 +0700 Subject: [PATCH 7/7] Renamed WorkingQueue to StartedJobRegistry. --- rq/{working_queue.py => registry.py} | 8 ++--- rq/worker.py | 12 ++++---- tests/test_worker.py | 6 ++-- tests/test_working_queue.py | 46 ++++++++++++++-------------- 4 files changed, 36 insertions(+), 36 deletions(-) rename rq/{working_queue.py => registry.py} (90%) diff --git a/rq/working_queue.py b/rq/registry.py similarity index 90% rename from rq/working_queue.py rename to rq/registry.py index 6dc6826f..2bf14453 100644 --- a/rq/working_queue.py +++ b/rq/registry.py @@ -4,10 +4,10 @@ from .queue import FailedQueue from .utils import current_timestamp -class WorkingQueue: +class StartedJobRegistry: """ - Registry of currently executing jobs. Each queue maintains a WorkingQueue. - WorkingQueue contains job keys that are currently being executed. + Registry of currently executing jobs. Each queue maintains a StartedJobRegistry. + StartedJobRegistry contains job keys that are currently being executed. Each key is scored by job's expiration time (datetime started + timeout). Jobs are added to registry right before they are executed and removed @@ -22,7 +22,7 @@ class WorkingQueue: self.connection = resolve_connection(connection) def add(self, job, timeout, pipeline=None): - """Adds a job to WorkingQueue with expiry time of now + timeout.""" + """Adds a job to StartedJobRegistry with expiry time of now + timeout.""" score = current_timestamp() + timeout if pipeline is not None: return pipeline.zadd(self.key, score, job.id) diff --git a/rq/worker.py b/rq/worker.py index 21ddac67..2a98f890 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION -from .working_queue import WorkingQueue +from .registry import StartedJobRegistry try: from procname import setprocname @@ -480,8 +480,8 @@ class Worker(object): self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) - working_queue = WorkingQueue(job.origin, self.connection) - working_queue.add(job, timeout, pipeline=pipeline) + registry = StartedJobRegistry(job.origin, self.connection) + registry.add(job, timeout, pipeline=pipeline) job.set_status(Status.STARTED, pipeline=pipeline) pipeline.execute() @@ -496,7 +496,7 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: - working_queue = WorkingQueue(job.origin, self.connection) + registry = StartedJobRegistry(job.origin, self.connection) try: with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): @@ -514,13 +514,13 @@ class Worker(object): job._status = Status.FINISHED job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) - working_queue.remove(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() except Exception: job.set_status(Status.FAILED, pipeline=pipeline) - working_queue.remove(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() self.handle_exception(job, *sys.exc_info()) diff --git a/tests/test_worker.py b/tests/test_worker.py index 91f95ea3..901a890f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -7,7 +7,7 @@ import os from rq import get_failed_queue, Queue, Worker from rq.compat import as_text from rq.job import Job, Status -from rq.working_queue import WorkingQueue +from rq.registry import StartedJobRegistry from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, @@ -287,8 +287,8 @@ class TestWorker(RQTestCase): worker.prepare_job_execution(job) # Updates working queue - working_queue = WorkingQueue(connection=self.testconn) - self.assertEqual(working_queue.get_job_ids(), [job.id]) + registry = StartedJobRegistry(connection=self.testconn) + self.assertEqual(registry.get_job_ids(), [job.id]) # Updates worker statuses self.assertEqual(worker.state, 'busy') diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index 249c0358..9830f221 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -5,7 +5,7 @@ from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.working_queue import WorkingQueue +from rq.registry import StartedJobRegistry from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -15,64 +15,64 @@ class TestQueue(RQTestCase): def setUp(self): super(TestQueue, self).setUp() - self.working_queue = WorkingQueue(connection=self.testconn) + self.registry = StartedJobRegistry(connection=self.testconn) def test_add_and_remove(self): - """Adding and removing job to WorkingQueue.""" + """Adding and removing job to StartedJobRegistry.""" timestamp = current_timestamp() job = Job() # Test that job is added with the right score - self.working_queue.add(job, 1000) - self.assertLess(self.testconn.zscore(self.working_queue.key, job.id), + self.registry.add(job, 1000) + self.assertLess(self.testconn.zscore(self.registry.key, job.id), timestamp + 1001) # Ensure that job is properly removed from sorted set - self.working_queue.remove(job) - self.assertIsNone(self.testconn.zscore(self.working_queue.key, job.id)) + self.registry.remove(job) + self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) def test_get_job_ids(self): - """Getting job ids from WorkingQueue.""" - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.testconn.zadd(self.working_queue.key, 10, 'bar') - self.assertEqual(self.working_queue.get_job_ids(), ['foo', 'bar']) + """Getting job ids from StartedJobRegistry.""" + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, 10, 'bar') + self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) def test_get_expired_job_ids(self): - """Getting expired job ids form WorkingQueue.""" + """Getting expired job ids form StartedJobRegistry.""" timestamp = current_timestamp() - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') - self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) + self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) def test_cleanup(self): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.working_queue.cleanup() + self.testconn.zadd(self.registry.key, 1, 'foo') + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) def test_job_execution(self): - """Job is removed from WorkingQueue after execution.""" - working_queue = WorkingQueue(connection=self.testconn) + """Job is removed from StartedJobRegistry after execution.""" + registry = StartedJobRegistry(connection=self.testconn) queue = Queue(connection=self.testconn) worker = Worker([queue]) job = queue.enqueue(say_hello) worker.prepare_job_execution(job) - self.assertIn(job.id, working_queue.get_job_ids()) + self.assertIn(job.id, registry.get_job_ids()) worker.perform_job(job) - self.assertNotIn(job.id, working_queue.get_job_ids()) + self.assertNotIn(job.id, registry.get_job_ids()) # Job that fails job = queue.enqueue(div_by_zero) worker.prepare_job_execution(job) - self.assertIn(job.id, working_queue.get_job_ids()) + self.assertIn(job.id, registry.get_job_ids()) worker.perform_job(job) - self.assertNotIn(job.id, working_queue.get_job_ids()) + self.assertNotIn(job.id, registry.get_job_ids())