rq/tests/test_worker_registration.py

110 lines
4.6 KiB
Python
Raw Normal View History

from unittest.mock import patch
from rq import Queue, Worker
from rq.utils import ceildiv
from rq.worker_registration import (
REDIS_WORKER_KEYS,
WORKERS_BY_QUEUE_KEY,
clean_worker_registry,
get_keys,
register,
unregister,
)
from tests import RQTestCase
class TestWorkerRegistry(RQTestCase):
def test_worker_registration(self):
"""Ensure worker.key is correctly set in Redis."""
foo_queue = Queue(name='foo', connection=self.connection)
bar_queue = Queue(name='bar', connection=self.connection)
worker = Worker([foo_queue, bar_queue], connection=self.connection)
register(worker)
redis = worker.connection
self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key))
RQ v1.0! (#1059) * Added FailedJobRegistry. * Added job.failure_ttl. * queue.enqueue() now supports failure_ttl * Added registry.get_queue(). * FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL. * StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry. * Failed jobs are now added to FailedJobRegistry. * Added FailedJobRegistry.requeue() * Document the new `FailedJobRegistry` and changes in custom exception handler behavior. * Added worker.disable_default_exception_handler. * Document --disable-default-exception-handler option. * Deleted worker.failed_queue. * Deleted "move_to_failed_queue" exception handler. * StartedJobRegistry should no longer move jobs to FailedQueue. * Deleted requeue_job * Fixed test error. * Make requeue cli command work with FailedJobRegistry * Added .pytest_cache to gitignore. * Custom exception handlers are no longer run in reverse * Restored requeue_job function * Removed get_failed_queue * Deleted FailedQueue * Updated changelog. * Document `failure_ttl` * Updated docs. * Remove job.status * Fixed typo in test_registry.py * Replaced _pipeline() with pipeline() * FailedJobRegistry no longer fails on redis-py>=3 * Fixes test_clean_registries * Worker names are now randomized * Added a note about random worker names in CHANGES.md * Worker will now stop working when encountering an unhandled exception. * Worker should reraise SystemExit on cold shutdowns * Added anchor.js to docs * Support for Sentry-SDK (#1045) * Updated RQ to support sentry-sdk * Document Sentry integration * Install sentry-sdk before running tests * Improved rq info CLI command to be more efficient when displaying lar… (#1046) * Improved rq info CLI command to be more efficient when displaying large number of workers * Fixed an rq info --by-queue bug * Fixed worker.total_working_time bug (#1047) * queue.enqueue() no longer accepts `timeout` argument (#1055) * Clean worker registry (#1056) * queue.enqueue() no longer accepts `timeout` argument * Added clean_worker_registry() * Show worker hostname and PID on cli (#1058) * Show worker hostname and PID on cli * Improve test coverage * Remove Redis version check when SSL is used * Bump version to 1.0 * Removed pytest_cache/README.md * Changed worker logging to use exc_info=True * Removed unused queue.dequeue() * Fixed typo in CHANGES.md * setup_loghandlers() should always call logger.setLevel() if specified
2019-03-30 02:13:56 +00:00
self.assertEqual(Worker.count(connection=redis), 1)
self.assertTrue(redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key))
RQ v1.0! (#1059) * Added FailedJobRegistry. * Added job.failure_ttl. * queue.enqueue() now supports failure_ttl * Added registry.get_queue(). * FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL. * StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry. * Failed jobs are now added to FailedJobRegistry. * Added FailedJobRegistry.requeue() * Document the new `FailedJobRegistry` and changes in custom exception handler behavior. * Added worker.disable_default_exception_handler. * Document --disable-default-exception-handler option. * Deleted worker.failed_queue. * Deleted "move_to_failed_queue" exception handler. * StartedJobRegistry should no longer move jobs to FailedQueue. * Deleted requeue_job * Fixed test error. * Make requeue cli command work with FailedJobRegistry * Added .pytest_cache to gitignore. * Custom exception handlers are no longer run in reverse * Restored requeue_job function * Removed get_failed_queue * Deleted FailedQueue * Updated changelog. * Document `failure_ttl` * Updated docs. * Remove job.status * Fixed typo in test_registry.py * Replaced _pipeline() with pipeline() * FailedJobRegistry no longer fails on redis-py>=3 * Fixes test_clean_registries * Worker names are now randomized * Added a note about random worker names in CHANGES.md * Worker will now stop working when encountering an unhandled exception. * Worker should reraise SystemExit on cold shutdowns * Added anchor.js to docs * Support for Sentry-SDK (#1045) * Updated RQ to support sentry-sdk * Document Sentry integration * Install sentry-sdk before running tests * Improved rq info CLI command to be more efficient when displaying lar… (#1046) * Improved rq info CLI command to be more efficient when displaying large number of workers * Fixed an rq info --by-queue bug * Fixed worker.total_working_time bug (#1047) * queue.enqueue() no longer accepts `timeout` argument (#1055) * Clean worker registry (#1056) * queue.enqueue() no longer accepts `timeout` argument * Added clean_worker_registry() * Show worker hostname and PID on cli (#1058) * Show worker hostname and PID on cli * Improve test coverage * Remove Redis version check when SSL is used * Bump version to 1.0 * Removed pytest_cache/README.md * Changed worker logging to use exc_info=True * Removed unused queue.dequeue() * Fixed typo in CHANGES.md * setup_loghandlers() should always call logger.setLevel() if specified
2019-03-30 02:13:56 +00:00
self.assertEqual(Worker.count(queue=foo_queue), 1)
self.assertTrue(redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key))
RQ v1.0! (#1059) * Added FailedJobRegistry. * Added job.failure_ttl. * queue.enqueue() now supports failure_ttl * Added registry.get_queue(). * FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL. * StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry. * Failed jobs are now added to FailedJobRegistry. * Added FailedJobRegistry.requeue() * Document the new `FailedJobRegistry` and changes in custom exception handler behavior. * Added worker.disable_default_exception_handler. * Document --disable-default-exception-handler option. * Deleted worker.failed_queue. * Deleted "move_to_failed_queue" exception handler. * StartedJobRegistry should no longer move jobs to FailedQueue. * Deleted requeue_job * Fixed test error. * Make requeue cli command work with FailedJobRegistry * Added .pytest_cache to gitignore. * Custom exception handlers are no longer run in reverse * Restored requeue_job function * Removed get_failed_queue * Deleted FailedQueue * Updated changelog. * Document `failure_ttl` * Updated docs. * Remove job.status * Fixed typo in test_registry.py * Replaced _pipeline() with pipeline() * FailedJobRegistry no longer fails on redis-py>=3 * Fixes test_clean_registries * Worker names are now randomized * Added a note about random worker names in CHANGES.md * Worker will now stop working when encountering an unhandled exception. * Worker should reraise SystemExit on cold shutdowns * Added anchor.js to docs * Support for Sentry-SDK (#1045) * Updated RQ to support sentry-sdk * Document Sentry integration * Install sentry-sdk before running tests * Improved rq info CLI command to be more efficient when displaying lar… (#1046) * Improved rq info CLI command to be more efficient when displaying large number of workers * Fixed an rq info --by-queue bug * Fixed worker.total_working_time bug (#1047) * queue.enqueue() no longer accepts `timeout` argument (#1055) * Clean worker registry (#1056) * queue.enqueue() no longer accepts `timeout` argument * Added clean_worker_registry() * Show worker hostname and PID on cli (#1058) * Show worker hostname and PID on cli * Improve test coverage * Remove Redis version check when SSL is used * Bump version to 1.0 * Removed pytest_cache/README.md * Changed worker logging to use exc_info=True * Removed unused queue.dequeue() * Fixed typo in CHANGES.md * setup_loghandlers() should always call logger.setLevel() if specified
2019-03-30 02:13:56 +00:00
self.assertEqual(Worker.count(queue=bar_queue), 1)
unregister(worker)
self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key))
self.assertFalse(redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key))
self.assertFalse(redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key))
def test_get_keys_by_queue(self):
"""get_keys_by_queue only returns active workers for that queue"""
foo_queue = Queue(name='foo', connection=self.connection)
bar_queue = Queue(name='bar', connection=self.connection)
baz_queue = Queue(name='baz', connection=self.connection)
worker1 = Worker([foo_queue, bar_queue], connection=self.connection)
worker2 = Worker([foo_queue], connection=self.connection)
worker3 = Worker([baz_queue], connection=self.connection)
self.assertEqual(set(), get_keys(foo_queue))
register(worker1)
register(worker2)
register(worker3)
# get_keys(queue) will return worker keys for that queue
self.assertEqual(set([worker1.key, worker2.key]), get_keys(foo_queue))
self.assertEqual(set([worker1.key]), get_keys(bar_queue))
# get_keys(connection=connection) will return all worker keys
self.assertEqual(set([worker1.key, worker2.key, worker3.key]), get_keys(connection=worker1.connection))
# Calling get_keys without arguments raises an exception
self.assertRaises(ValueError, get_keys)
unregister(worker1)
unregister(worker2)
unregister(worker3)
RQ v1.0! (#1059) * Added FailedJobRegistry. * Added job.failure_ttl. * queue.enqueue() now supports failure_ttl * Added registry.get_queue(). * FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL. * StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry. * Failed jobs are now added to FailedJobRegistry. * Added FailedJobRegistry.requeue() * Document the new `FailedJobRegistry` and changes in custom exception handler behavior. * Added worker.disable_default_exception_handler. * Document --disable-default-exception-handler option. * Deleted worker.failed_queue. * Deleted "move_to_failed_queue" exception handler. * StartedJobRegistry should no longer move jobs to FailedQueue. * Deleted requeue_job * Fixed test error. * Make requeue cli command work with FailedJobRegistry * Added .pytest_cache to gitignore. * Custom exception handlers are no longer run in reverse * Restored requeue_job function * Removed get_failed_queue * Deleted FailedQueue * Updated changelog. * Document `failure_ttl` * Updated docs. * Remove job.status * Fixed typo in test_registry.py * Replaced _pipeline() with pipeline() * FailedJobRegistry no longer fails on redis-py>=3 * Fixes test_clean_registries * Worker names are now randomized * Added a note about random worker names in CHANGES.md * Worker will now stop working when encountering an unhandled exception. * Worker should reraise SystemExit on cold shutdowns * Added anchor.js to docs * Support for Sentry-SDK (#1045) * Updated RQ to support sentry-sdk * Document Sentry integration * Install sentry-sdk before running tests * Improved rq info CLI command to be more efficient when displaying lar… (#1046) * Improved rq info CLI command to be more efficient when displaying large number of workers * Fixed an rq info --by-queue bug * Fixed worker.total_working_time bug (#1047) * queue.enqueue() no longer accepts `timeout` argument (#1055) * Clean worker registry (#1056) * queue.enqueue() no longer accepts `timeout` argument * Added clean_worker_registry() * Show worker hostname and PID on cli (#1058) * Show worker hostname and PID on cli * Improve test coverage * Remove Redis version check when SSL is used * Bump version to 1.0 * Removed pytest_cache/README.md * Changed worker logging to use exc_info=True * Removed unused queue.dequeue() * Fixed typo in CHANGES.md * setup_loghandlers() should always call logger.setLevel() if specified
2019-03-30 02:13:56 +00:00
def test_clean_registry(self):
"""clean_registry removes worker keys that don't exist in Redis"""
queue = Queue(name='foo', connection=self.connection)
worker = Worker([queue], connection=self.connection)
RQ v1.0! (#1059) * Added FailedJobRegistry. * Added job.failure_ttl. * queue.enqueue() now supports failure_ttl * Added registry.get_queue(). * FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL. * StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry. * Failed jobs are now added to FailedJobRegistry. * Added FailedJobRegistry.requeue() * Document the new `FailedJobRegistry` and changes in custom exception handler behavior. * Added worker.disable_default_exception_handler. * Document --disable-default-exception-handler option. * Deleted worker.failed_queue. * Deleted "move_to_failed_queue" exception handler. * StartedJobRegistry should no longer move jobs to FailedQueue. * Deleted requeue_job * Fixed test error. * Make requeue cli command work with FailedJobRegistry * Added .pytest_cache to gitignore. * Custom exception handlers are no longer run in reverse * Restored requeue_job function * Removed get_failed_queue * Deleted FailedQueue * Updated changelog. * Document `failure_ttl` * Updated docs. * Remove job.status * Fixed typo in test_registry.py * Replaced _pipeline() with pipeline() * FailedJobRegistry no longer fails on redis-py>=3 * Fixes test_clean_registries * Worker names are now randomized * Added a note about random worker names in CHANGES.md * Worker will now stop working when encountering an unhandled exception. * Worker should reraise SystemExit on cold shutdowns * Added anchor.js to docs * Support for Sentry-SDK (#1045) * Updated RQ to support sentry-sdk * Document Sentry integration * Install sentry-sdk before running tests * Improved rq info CLI command to be more efficient when displaying lar… (#1046) * Improved rq info CLI command to be more efficient when displaying large number of workers * Fixed an rq info --by-queue bug * Fixed worker.total_working_time bug (#1047) * queue.enqueue() no longer accepts `timeout` argument (#1055) * Clean worker registry (#1056) * queue.enqueue() no longer accepts `timeout` argument * Added clean_worker_registry() * Show worker hostname and PID on cli (#1058) * Show worker hostname and PID on cli * Improve test coverage * Remove Redis version check when SSL is used * Bump version to 1.0 * Removed pytest_cache/README.md * Changed worker logging to use exc_info=True * Removed unused queue.dequeue() * Fixed typo in CHANGES.md * setup_loghandlers() should always call logger.setLevel() if specified
2019-03-30 02:13:56 +00:00
register(worker)
redis = worker.connection
self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key))
self.assertTrue(redis.sismember(REDIS_WORKER_KEYS, worker.key))
clean_worker_registry(queue)
self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key))
self.assertFalse(redis.sismember(REDIS_WORKER_KEYS, worker.key))
def test_clean_large_registry(self):
"""
clean_registry() splits invalid_keys into multiple lists for set removal to avoid sending more than redis can
receive
"""
MAX_WORKERS = 41
MAX_KEYS = 37
Group jobs into batches and retrieve by batch name (#1945) * Batch.py initial commit * Add method to refresh batch status * Clean up Redis key properties * Remove persist_job method * Execute refresh method when fetching batch * Add batch_id to job * Add option to batch jobs with enqueue_many * Add set_batch_id method * Handle batch_ttl when job is started * Renew ttl on all jobs in batch when a job finishes * Use fetch_jobs method in refresh() * Remove batch TTL During worker maintenance task, worker will loop through batches and delete expired jobs. When all jobs are expired, the batch is deleted. * Fix buggy connection * Raise batch error in fetch method * Fix fetch connection arg * Add testing for batch * Remove unused import * Update batch documentation * Add batch ID to job persistence test * Test that empty batch is removed from batch list * Make default batch ID full uuid4 to match Job * Add method to return batch key * Add all method to return iterable of batches * Revert changes to queue.py * Separate batch creating from job enqueueing. Batches can be created by passing an array of jobs. * Fix bug with stopped callback in enqueue_many * Rename delete_expired_jobs batch method * Use exists to identify expired jobs * Remove jobs attribute from batch Jobs have to be explicitly fetched using the get_jobs method * Don't add batch_id to job in _add_jobs method This should be done when the job is created before it is saved to redis. * Use cleanup method to determine if batch should be deleted * Add get_jobs method This method will return an array of jobs belonging to the batch. * Update create method to not allow jobs arg Jobs should be added to batches in the Queue.enqueue_many function * Add batch_id to job create method * Delete set_batch_id method The batch ID should be set atomically when the job is created * Add batch argument to queue.enqueue_many This will be how jobs can be batched. The batch ID is added to each job created by enqueue_many, and then those jobs are all added to the batch in Redis using the batch's _add_jobs method * Update batch tests to use new enqueue_many arg * Fix batch_id for non_batched jobs * Use different pipeline name * Don't use fetch method when enqueuing jobs If the batch is empty, fetch will delete it from Redis * Test enqueuing batched jobs with string ID * Update batch documentation * Remove unused variables * Fix expired job tracking bug * Add delete_job method * Use delete_job method on batch * Pass multiple jobs into srem command * Add multiple keys to exists call * Pipeline _add_jobs call * Fix missing job_id arg * Move clean_batch_registry to Batch class * Remove unused variables * Fix missing dependency deletion I accidentally deleted this earlier * Move batch enqueueing to batch class * Update batch docs * Add test for batched jobs with str queue arg * Don't delete batch in cleanup * Change Batch to Group * Import EnqueueData for type hint * Sort imports * Remove local config file * Rename clean_group_registries method * Update feature version * Fix group constant names * Only allow Queue in enqueue many * Move cleanup logic to classmethod * Remove str group test * Remove unused import * Make pipeline mandatory in _add_jobs method * Rename expired job ids array * Add slow decorator * Assert job 1 data in group * Rename id arg to name * Only run cleanup when jobs are retrieved * Remove job_ids variable * Fix group name arg * Fix group name arg * Clean groups before fetching * Use uuid4().hex for shorter id * Move cleanup logic to instance method * Use all method when cleaning group registry * Check if group exists instead of catching error * Cleanup expired jobs * Apply black formatting * Fix black formatting * Pipeline group cleanup * Fix pipeline call to exists * Add pyenv version file * Remove group cleanup after job completes * Use existing pipeline * Remove unneeded pipeline check * Fix empty call * Test group __repr__ method * Remove unnecessary pipeline assignment * Remove unused delete method * Fix pipeline name * Remove unnecessary conditional block
2024-03-11 08:08:53 +00:00
# srem is called twice per invalid key group: once for WORKERS_BY_QUEUE_KEY; once for REDIS_WORKER_KEYS
SREM_CALL_COUNT = 2
queue = Queue(name='foo', connection=self.connection)
for i in range(MAX_WORKERS):
worker = Worker([queue], connection=self.connection)
register(worker)
with patch('rq.worker_registration.MAX_KEYS', MAX_KEYS), patch.object(
queue.connection, 'pipeline', wraps=queue.connection.pipeline
) as pipeline_mock:
# clean_worker_registry creates a pipeline with a context manager. Configure the mock using the context
# manager entry method __enter__
pipeline_mock.return_value.__enter__.return_value.srem.return_value = None
pipeline_mock.return_value.__enter__.return_value.execute.return_value = [0] * MAX_WORKERS
clean_worker_registry(queue)
expected_call_count = (ceildiv(MAX_WORKERS, MAX_KEYS)) * SREM_CALL_COUNT
self.assertEqual(pipeline_mock.return_value.__enter__.return_value.srem.call_count, expected_call_count)