diff --git a/rq/scheduler.py b/rq/scheduler.py index 97d627cc..175f6078 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -171,24 +171,24 @@ class RQScheduler: def heartbeat(self): """Updates the TTL on scheduler keys and the locks""" self.log.debug('Scheduler sending heartbeat to %s', ', '.join(self.acquired_locks)) - if len(self._queue_names) > 1: + if len(self._acquired_locks) > 1: with self.connection.pipeline() as pipeline: for name in self._acquired_locks: key = self.get_locking_key(name) pipeline.expire(key, self.interval + 60) pipeline.execute() - else: - key = self.get_locking_key(next(iter(self._queue_names))) + elif self._acquired_locks: + key = self.get_locking_key(next(iter(self._acquired_locks))) self.connection.expire(key, self.interval + 60) def stop(self): - self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._queue_names)) + self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._acquired_locks)) self.release_locks() self._status = self.Status.STOPPED def release_locks(self): """Release acquired locks""" - keys = [self.get_locking_key(name) for name in self._queue_names] + keys = [self.get_locking_key(name) for name in self._acquired_locks] self.connection.delete(*keys) self._acquired_locks = set() diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 8aa722a4..e6edf725 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -204,6 +204,33 @@ class TestScheduler(RQTestCase): self.assertEqual(mocked.call_count, 1) self.assertEqual(stopped_process.is_alive.call_count, 1) + def test_lock_release(self): + """Test that scheduler.release_locks() only releases acquired locks""" + name_1 = 'lock-test-1' + name_2 = 'lock-test-2' + scheduler_1 = RQScheduler([name_1], self.testconn) + + self.assertEqual(scheduler_1.acquire_locks(), {name_1}) + self.assertEqual(scheduler_1._acquired_locks, {name_1}) + + # Only name_2 is returned since name_1 is already locked + scheduler_1_2 = RQScheduler([name_1, name_2], self.testconn) + self.assertEqual(scheduler_1_2.acquire_locks(), {name_2}) + self.assertEqual(scheduler_1_2._acquired_locks, {name_2}) + + self.assertTrue(self.testconn.exists(scheduler_1.get_locking_key(name_1))) + self.assertTrue(self.testconn.exists(scheduler_1_2.get_locking_key(name_1))) + self.assertTrue(self.testconn.exists(scheduler_1_2.get_locking_key(name_2))) + + scheduler_1_2.release_locks() + + self.assertEqual(scheduler_1_2._acquired_locks, set()) + self.assertEqual(scheduler_1._acquired_locks, {name_1}) + + self.assertTrue(self.testconn.exists(scheduler_1.get_locking_key(name_1))) + self.assertTrue(self.testconn.exists(scheduler_1_2.get_locking_key(name_1))) + self.assertFalse(self.testconn.exists(scheduler_1_2.get_locking_key(name_2))) + def test_queue_scheduler_pid(self): queue = Queue(connection=self.testconn) scheduler = RQScheduler( @@ -219,25 +246,33 @@ class TestScheduler(RQTestCase): """Test that heartbeat updates locking keys TTL""" name_1 = 'lock-test-1' name_2 = 'lock-test-2' - scheduler = RQScheduler([name_1, name_2], self.testconn) + name_3 = 'lock-test-3' + scheduler = RQScheduler([name_3], self.testconn) + scheduler.acquire_locks() + scheduler = RQScheduler([name_1, name_2, name_3], self.testconn) scheduler.acquire_locks() locking_key_1 = RQScheduler.get_locking_key(name_1) locking_key_2 = RQScheduler.get_locking_key(name_2) + locking_key_3 = RQScheduler.get_locking_key(name_3) with self.testconn.pipeline() as pipeline: pipeline.expire(locking_key_1, 1000) pipeline.expire(locking_key_2, 1000) + pipeline.expire(locking_key_3, 1000) + pipeline.execute() scheduler.heartbeat() self.assertEqual(self.testconn.ttl(locking_key_1), 61) - self.assertEqual(self.testconn.ttl(locking_key_1), 61) + self.assertEqual(self.testconn.ttl(locking_key_2), 61) + self.assertEqual(self.testconn.ttl(locking_key_3), 1000) # scheduler.stop() releases locks and sets status to STOPPED scheduler._status = scheduler.Status.WORKING scheduler.stop() self.assertFalse(self.testconn.exists(locking_key_1)) self.assertFalse(self.testconn.exists(locking_key_2)) + self.assertTrue(self.testconn.exists(locking_key_3)) self.assertEqual(scheduler.status, scheduler.Status.STOPPED) # Heartbeat also works properly for schedulers with a single queue