From 87da060648c5f650a0d412f130023aafd8c147d3 Mon Sep 17 00:00:00 2001 From: Andrew Nisbet Date: Tue, 20 Jun 2023 18:53:42 -0700 Subject: [PATCH 01/14] Add result blocking (#1939) * Add result blocking * Dev tidyup * Skip XREAD test on old redis versions * Lint * Clarify that the latest result is returned. * Fix job test ordering * Remove test latency hack * Readability improvements --- docs/docs/results.md | 7 +++++++ rq/job.py | 10 ++++++---- rq/results.py | 33 ++++++++++++++++++++++++--------- tests/test_job.py | 22 ++++++++++++++++++++-- tests/test_results.py | 28 ++++++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 15 deletions(-) diff --git a/docs/docs/results.md b/docs/docs/results.md index b2a054ae..2a2c6d46 100644 --- a/docs/docs/results.md +++ b/docs/docs/results.md @@ -156,3 +156,10 @@ job = Job.fetch(id='my_id', connection=redis) for result in job.results(): print(result.created_at, result.type) ``` + +To block until a result arrives, you can pass a timeout in seconds to `job.latest_result()`. If any results already exist, the latest result is returned immediately. If the timeout is reached without a result arriving, a `None` object is returned. + +```python +job = queue.enqueue(sleep_for_10_seconds) +result = job.fetch_latest(timeout=60) # Will hang for about 10 seconds. +``` \ No newline at end of file diff --git a/rq/job.py b/rq/job.py index f2707b56..c7bc7a7a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -807,7 +807,7 @@ class Job: return self._exc_info def return_value(self, refresh: bool = False) -> Optional[Any]: - """Returns the return value of the latest execution, if it was successful + """Returns the return value of the latest execution, if it was successful. Args: refresh (bool, optional): Whether to refresh the current status. Defaults to False. @@ -886,16 +886,18 @@ class Job: return Result.all(self, serializer=self.serializer) - def latest_result(self) -> Optional['Result']: + def latest_result(self, timeout: int = 0) -> Optional['Result']: """Get the latest job result. + Args: + timeout (int, optional): Number of seconds to block waiting for a result. Defaults to 0 (no blocking). + Returns: result (Result): The Result object """ - """Returns the latest Result object""" from .results import Result - return Result.fetch_latest(self, serializer=self.serializer) + return Result.fetch_latest(self, serializer=self.serializer, timeout=timeout) def restore(self, raw_data) -> Any: """Overwrite properties with the provided values stored in Redis. diff --git a/rq/results.py b/rq/results.py index 27bab15a..a5923880 100644 --- a/rq/results.py +++ b/rq/results.py @@ -143,16 +143,31 @@ class Result: return None @classmethod - def fetch_latest(cls, job: Job, serializer=None) -> Optional['Result']: - """Returns the latest result for given job instance or ID""" - # response = job.connection.zrevrangebyscore(cls.get_key(job.id), '+inf', '-inf', - # start=0, num=1, withscores=True) - response = job.connection.xrevrange(cls.get_key(job.id), '+', '-', count=1) - if not response: - return None + def fetch_latest(cls, job: Job, serializer=None, timeout: int = 0) -> Optional['Result']: + """Returns the latest result for given job instance or ID. - result_id, payload = response[0] - return cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) + If a non-zero timeout is provided, block for a result until timeout is reached. + """ + if timeout: + # Unlike blpop, xread timeout is in miliseconds. "0-0" is the special value for the + # first item in the stream, like '-' for xrevrange. + timeout_ms = timeout * 1000 + response = job.connection.xread({cls.get_key(job.id): "0-0"}, block=timeout_ms) + if not response: + return None + response = response[0] # Querying single stream only. + response = response[1] # Xread also returns Result.id, which we don't need. + result_id, payload = response[-1] # Take most recent result. + + else: + # If not blocking, use xrevrange to load a single result (as xread will load them all). + response = job.connection.xrevrange(cls.get_key(job.id), '+', '-', count=1) + if not response: + return None + result_id, payload = response[0] + + res = cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) + return res @classmethod def get_key(cls, job_id): diff --git a/tests/test_job.py b/tests/test_job.py index 29c309f2..83f9a9d9 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,11 +1,12 @@ import json import queue import time +import unittest import zlib from datetime import datetime, timedelta from pickle import dumps, loads -from redis import WatchError +from redis import Redis, WatchError from rq.defaults import CALLBACK_TIMEOUT from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError @@ -20,7 +21,7 @@ from rq.registry import ( StartedJobRegistry, ) from rq.serializers import JSONSerializer -from rq.utils import as_text, utcformat, utcnow +from rq.utils import as_text, get_version, utcformat, utcnow from rq.worker import Worker from tests import RQTestCase, fixtures @@ -1219,3 +1220,20 @@ class TestJob(RQTestCase): self.assertEqual(queue.count, 0) self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B])) self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"]) + + @unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0') + def test_blocking_result_fetch(self): + # Ensure blocking waits for the time to run the job, but not right up until the timeout. + job_sleep_seconds = 2 + block_seconds = 5 + queue_name = "test_blocking_queue" + q = Queue(queue_name) + job = q.enqueue(fixtures.long_running_job, job_sleep_seconds) + started_at = time.time() + fixtures.start_worker_process(queue_name, burst=True) + result = job.latest_result(timeout=block_seconds) + blocked_for = time.time() - started_at + self.assertEqual(job.get_status(), JobStatus.FINISHED) + self.assertIsNotNone(result) + self.assertGreaterEqual(blocked_for, job_sleep_seconds) + self.assertLess(blocked_for, block_seconds) diff --git a/tests/test_results.py b/tests/test_results.py index e27e872a..7d188c5c 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -1,4 +1,5 @@ import tempfile +import time import unittest from datetime import timedelta from unittest.mock import PropertyMock, patch @@ -249,3 +250,30 @@ class TestScheduledJobRegistry(RQTestCase): Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) self.assertEqual(Result.count(job), 2) + + def test_blocking_results(self): + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + + # Should block if there's no result. + timeout = 1 + self.assertIsNone(Result.fetch_latest(job)) + started_at = time.time() + self.assertIsNone(Result.fetch_latest(job, timeout=timeout)) + blocked_for = time.time() - started_at + self.assertGreaterEqual(blocked_for, timeout) + + # Shouldn't block if there's already a result present. + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) + timeout = 1 + result_sync = Result.fetch_latest(job) + started_at = time.time() + result_blocking = Result.fetch_latest(job, timeout=timeout) + blocked_for = time.time() - started_at + self.assertEqual(result_sync.return_value, result_blocking.return_value) + self.assertGreater(timeout, blocked_for) + + # Should return the latest result if there are multiple. + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=2) + result_blocking = Result.fetch_latest(job, timeout=1) + self.assertEqual(result_blocking.return_value, 2) From 7349032bbf1d2adb563985102f1009245d823abe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=B3=20Albert=20i=20Beltran?= Date: Sat, 7 Oct 2023 02:09:32 +0200 Subject: [PATCH 02/14] fix_cleanup_ghosts_by_using_configured_worker_class (#1988) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simó Albert i Beltran --- rq/cli/cli.py | 2 +- rq/contrib/legacy.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 7f8b6d09..8b0602ae 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -256,7 +256,7 @@ def worker( setup_loghandlers_from_args(verbose, quiet, date_format, log_format) try: - cleanup_ghosts(cli_config.connection) + cleanup_ghosts(cli_config.connection, worker_class=cli_config.worker_class) exception_handlers = [] for h in exception_handler: exception_handlers.append(import_attribute(h)) diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py index be44b65a..9362d697 100644 --- a/rq/contrib/legacy.py +++ b/rq/contrib/legacy.py @@ -5,7 +5,7 @@ from rq import Worker, get_current_connection logger = logging.getLogger(__name__) -def cleanup_ghosts(conn=None): +def cleanup_ghosts(conn=None, worker_class=Worker): """ RQ versions < 0.3.6 suffered from a race condition where workers, when abruptly terminated, did not have a chance to clean up their worker @@ -16,7 +16,7 @@ def cleanup_ghosts(conn=None): This function will clean up any of such legacy ghosted workers. """ conn = conn if conn else get_current_connection() - for worker in Worker.all(connection=conn): + for worker in worker_class.all(connection=conn): if conn.ttl(worker.key) == -1: ttl = worker.worker_ttl conn.expire(worker.key, ttl) From 2440db31f1caf4b600d89db4f4477853c31542ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Gyarmati?= Date: Fri, 19 Jan 2024 01:21:26 +0100 Subject: [PATCH 03/14] fix: cast redis server version from `float` to `str` (#2025) Currently, with Redis engine 7.1, and `redis` client 5.0.1 trying to enqueue a new job yields ```text AttributeError: 'float' object has no attribute 'split' ``` This is caused by the fact that `redis_conn.info("server")["redis_version"]` returns a `float`, not a `str`. --- rq/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/utils.py b/rq/utils.py index 8c341394..1adc8a75 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -290,7 +290,7 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]: setattr( connection, "__rq_redis_server_version", - tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3]), + tuple(int(i) for i in str(connection.info("server")["redis_version"]).split('.')[:3]), ) return getattr(connection, "__rq_redis_server_version") except ResponseError: # fakeredis doesn't implement Redis' INFO command From 49f12451ba55232a038b9340fceb0f244b4251f4 Mon Sep 17 00:00:00 2001 From: Christofer Saputra <61144981+chromium7@users.noreply.github.com> Date: Tue, 13 Feb 2024 09:33:06 +0700 Subject: [PATCH 04/14] Set workerpool with_scheduler defaults to True (#2027) * Set workerpool with_scheduler defaults to True * Add delay before sending shutdown command * Black format * Delay a bit longer for test --- rq/worker.py | 18 ++++++++++-------- rq/worker_pool.py | 2 +- tests/test_worker_pool.py | 6 ++++-- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index d5b921ad..c921823b 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -156,15 +156,17 @@ class BaseWorker: self.serializer = resolve_serializer(serializer) queues = [ - self.queue_class( - name=q, - connection=connection, - job_class=self.job_class, - serializer=self.serializer, - death_penalty_class=self.death_penalty_class, + ( + self.queue_class( + name=q, + connection=connection, + job_class=self.job_class, + serializer=self.serializer, + death_penalty_class=self.death_penalty_class, + ) + if isinstance(q, str) + else q ) - if isinstance(q, str) - else q for q in ensure_list(queues) ] diff --git a/rq/worker_pool.py b/rq/worker_pool.py index b161cc8e..eb93065c 100644 --- a/rq/worker_pool.py +++ b/rq/worker_pool.py @@ -247,4 +247,4 @@ def run_worker( worker = worker_class(queues, name=worker_name, connection=connection, serializer=serializer, job_class=job_class) worker.log.info("Starting worker started with PID %s", os.getpid()) time.sleep(_sleep) - worker.work(burst=burst, logging_level=logging_level) + worker.work(burst=burst, with_scheduler=True, logging_level=logging_level) diff --git a/tests/test_worker_pool.py b/tests/test_worker_pool.py index 219b4a86..6c300c4e 100644 --- a/tests/test_worker_pool.py +++ b/tests/test_worker_pool.py @@ -43,9 +43,10 @@ class TestWorkerPool(TestCase): self.assertEqual(len(pool.worker_dict.keys()), 2) worker_data = list(pool.worker_dict.values())[0] + sleep(0.5) _send_shutdown_command(worker_data.name, self.connection.connection_pool.connection_kwargs.copy(), delay=0) # 1 worker should be dead since we sent a shutdown command - sleep(0.2) + sleep(0.75) pool.check_workers(respawn=False) self.assertEqual(len(pool.worker_dict.keys()), 1) @@ -65,9 +66,10 @@ class TestWorkerPool(TestCase): self.assertEqual(len(pool.worker_dict.keys()), 2) worker_data = list(pool.worker_dict.values())[0] + sleep(0.5) _send_shutdown_command(worker_data.name, self.connection.connection_pool.connection_kwargs.copy(), delay=0) # 1 worker should be dead since we sent a shutdown command - sleep(0.2) + sleep(0.75) pool.reap_workers() self.assertEqual(len(pool.worker_dict.keys()), 1) pool.stop_workers() From 1e6953b534264522df060476d0e5868e57264e15 Mon Sep 17 00:00:00 2001 From: Ethan Wolinsky <58117461+eswolinsky3241@users.noreply.github.com> Date: Tue, 18 Jul 2023 23:46:48 -0400 Subject: [PATCH 05/14] Fix bug with stopped callback in enqueue_many (#1954) * Fix bug with stopped callback in enqueue_many * Add test for callbacks enqueued using enqueue_many --- rq/queue.py | 1 + tests/test_callbacks.py | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/rq/queue.py b/rq/queue.py index ab7d7d4c..17972ccd 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -818,6 +818,7 @@ class Queue: "retry": job_data.retry, "on_success": job_data.on_success, "on_failure": job_data.on_failure, + "on_stopped": job_data.on_stopped, } # Enqueue jobs without dependencies diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index 8aa9ad44..2dd29bdf 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -104,6 +104,18 @@ class QueueCallbackTestCase(RQTestCase): job = Job.fetch(id=job.id, connection=self.testconn) self.assertEqual(job.stopped_callback, print) + def test_enqueue_many_callback(self): + queue = Queue('example', connection=self.testconn) + + job_data = Queue.prepare_data( + func=say_hello, on_success=print, on_failure=save_exception, on_stopped=save_result_if_not_stopped + ) + + jobs = queue.enqueue_many([job_data]) + assert jobs[0].success_callback == job_data.on_success + assert jobs[0].failure_callback == job_data.on_failure + assert jobs[0].stopped_callback == job_data.on_stopped + class SyncJobCallback(RQTestCase): def test_success_callback(self): From efd4bd82d7d1ffd7d31ece1a5b21dd728b9d91f6 Mon Sep 17 00:00:00 2001 From: Ethan Wolinsky <58117461+eswolinsky3241@users.noreply.github.com> Date: Fri, 23 Feb 2024 21:44:44 -0500 Subject: [PATCH 06/14] Delete maintenance lock after registries cleaned (#2024) * Delete maintenance lock after registries cleaned * Fix test to assert that lock is released * Remove deprecated test case --- rq/queue.py | 4 ++++ rq/worker.py | 1 + tests/test_worker.py | 18 ++++++------------ 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 17972ccd..2dbecbda 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -253,6 +253,10 @@ class Queue: return False return lock_acquired + def release_maintenance_lock(self): + """Deletes the maintenance lock after registries have been cleaned""" + self.connection.delete(self.registry_cleaning_key) + def empty(self): """Removes all messages on the queue. This is currently being done using a Lua script, diff --git a/rq/worker.py b/rq/worker.py index c921823b..8fa185a4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -318,6 +318,7 @@ class BaseWorker: clean_registries(queue) worker_registration.clean_worker_registry(queue) clean_intermediate_queue(self, queue) + queue.release_maintenance_lock() self.last_cleaned_at = utcnow() def get_redis_server_version(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 5784e343..8789a87c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1029,14 +1029,8 @@ class TestWorker(RQTestCase): self.assertEqual(worker.last_cleaned_at, None) worker.clean_registries() self.assertNotEqual(worker.last_cleaned_at, None) - self.assertEqual(self.testconn.zcard(foo_registry.key), 0) - self.assertEqual(self.testconn.zcard(bar_registry.key), 0) - - # worker.clean_registries() only runs once every 15 minutes - # If we add another key, calling clean_registries() should do nothing - self.testconn.zadd(bar_registry.key, {'bar': 1}) - worker.clean_registries() - self.assertEqual(self.testconn.zcard(bar_registry.key), 1) + self.assertEqual(len(foo_registry), 0) + self.assertEqual(len(bar_registry), 0) def test_should_run_maintenance_tasks(self): """Workers should run maintenance tasks on startup and every hour.""" @@ -1059,13 +1053,13 @@ class TestWorker(RQTestCase): def test_worker_calls_clean_registries(self): """Worker calls clean_registries when run.""" - queue = Queue(connection=self.testconn) - registry = StartedJobRegistry(connection=self.testconn) + queue = Queue(connection=self.connection) + registry = StartedJobRegistry(connection=self.connection) self.testconn.zadd(registry.key, {'foo': 1}) - worker = Worker(queue, connection=self.testconn) + worker = Worker(queue, connection=self.connection) worker.work(burst=True) - self.assertEqual(self.testconn.zcard(registry.key), 0) + self.assertEqual(len(registry), 0) def test_job_dependency_race_condition(self): """Dependencies added while the job gets finished shouldn't get lost.""" From 0935f47213f550bb40a7b92cdc696856d4d3ca3b Mon Sep 17 00:00:00 2001 From: Rob Hudson Date: Sun, 9 Jul 2023 03:34:25 -0700 Subject: [PATCH 07/14] Store project metadata in pyproject.toml (PEP 621) (#1952) --- .github/workflows/dependencies.yml | 12 ++-- .github/workflows/docker.yml | 2 +- .github/workflows/workflow.yml | 7 +-- .pre-commit-config.yaml | 4 ++ MANIFEST.in | 4 -- dev-requirements-36.txt | 6 -- dev-requirements.txt | 6 -- pyproject.toml | 95 +++++++++++++++++++++++++++++- requirements.txt | 2 - setup.cfg | 6 -- setup.py | 88 --------------------------- tests/Dockerfile | 7 +-- tox.ini | 4 +- 13 files changed, 109 insertions(+), 134 deletions(-) delete mode 100644 MANIFEST.in delete mode 100644 dev-requirements-36.txt delete mode 100644 dev-requirements.txt delete mode 100644 requirements.txt delete mode 100644 setup.cfg delete mode 100644 setup.py diff --git a/.github/workflows/dependencies.yml b/.github/workflows/dependencies.yml index b1807f78..241369bc 100644 --- a/.github/workflows/dependencies.yml +++ b/.github/workflows/dependencies.yml @@ -33,13 +33,12 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip + pip install hatch pip install redis==${{ matrix.redis-py-version }} - pip install -r requirements.txt -r dev-requirements.txt - pip install -e . - name: Test with pytest run: | - RUN_SLOW_TESTS_TOO=1 pytest --durations=5 + RUN_SLOW_TESTS_TOO=1 hatch run test:pytest --durations=5 dependency-build: name: Check development branches of dependencies @@ -68,13 +67,10 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install git+https://github.com/redis/redis-py - pip install git+https://github.com/pallets/click - pip install -r dev-requirements.txt - pip install -e . + pip install hatch - name: Test with pytest - run: RUN_SLOW_TESTS_TOO=1 pytest --durations=5 > log.txt 2>&1 + run: RUN_SLOW_TESTS_TOO=1 hatch run test:pytest --durations=5 > log.txt 2>&1 - uses: actions/upload-artifact@v3 with: diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 7a9dda60..b4473767 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -8,7 +8,7 @@ on: permissions: - contents: read write # to fetch code (actions/checkout) + contents: write # to fetch code (actions/checkout) packages: write jobs: diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 1de525fa..8a3e532a 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -25,7 +25,7 @@ jobs: uses: addnab/docker-run-action@v3 with: image: rqtest-image:latest - run: stunnel & redis-server & RUN_SSL_TESTS=1 tox run -e ssl + run: stunnel & redis-server & RUN_SSL_TESTS=1 hatch run tox run -e ssl test: name: Python${{ matrix.python-version }}/Redis${{ matrix.redis-version }}/redis-py${{ matrix.redis-py-version }} @@ -53,13 +53,12 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip + pip install hatch pip install redis==${{ matrix.redis-py-version }} - pip install -r requirements.txt -r dev-requirements.txt - pip install -e . - name: Test with pytest run: | - RUN_SLOW_TESTS_TOO=1 pytest --cov=rq --cov-config=.coveragerc --cov-report=xml --durations=5 + RUN_SLOW_TESTS_TOO=1 hatch run test:cov --durations=5 - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d45026be..5bd0e359 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,3 +7,7 @@ repos: rev: "v0.0.267" hooks: - id: ruff + - repo: https://github.com/tox-dev/pyproject-fmt + rev: 0.12.0 + hooks: + - id: pyproject-fmt diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index cdd4ea5a..00000000 --- a/MANIFEST.in +++ /dev/null @@ -1,4 +0,0 @@ -include LICENSE -include *.toml -include requirements.txt -recursive-exclude tests * diff --git a/dev-requirements-36.txt b/dev-requirements-36.txt deleted file mode 100644 index 7eae078a..00000000 --- a/dev-requirements-36.txt +++ /dev/null @@ -1,6 +0,0 @@ -packaging==21.3 -coverage==6.2 -psutil -pytest -pytest-cov -sentry-sdk diff --git a/dev-requirements.txt b/dev-requirements.txt deleted file mode 100644 index cbe9464c..00000000 --- a/dev-requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -packaging -coverage -psutil -pytest -pytest-cov -sentry-sdk diff --git a/pyproject.toml b/pyproject.toml index ebcd7e8d..7bb68c0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,97 @@ +[build-system] +build-backend = "hatchling.build" +requires = [ + "hatchling", +] + +[project] +name = "rq" +description = "RQ is a simple, lightweight, library for creating background jobs, and processing them." +readme = "README.md" +license = "BSD-2-Clause" +maintainers = [ + {name = "Selwin Ong"}, +] +authors = [ + { name = "Vincent Driessen", email = "vincent@3rdcloud.com" }, +] +requires-python = ">=3.7" +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Intended Audience :: End Users/Desktop", + "Intended Audience :: Information Technology", + "Intended Audience :: Science/Research", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: BSD License", + "Operating System :: MacOS", + "Operating System :: POSIX", + "Operating System :: Unix", + "Programming Language :: Python", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Topic :: Internet", + "Topic :: Scientific/Engineering", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: System :: Distributed Computing", + "Topic :: System :: Monitoring", + "Topic :: System :: Systems Administration", +] +dynamic = [ + "version", +] +dependencies = [ + "click>=5", + "redis>=3.5", +] +[project.urls] +changelog = "https://github.com/rq/rq/blob/master/CHANGES.md" +documentation = "https://python-rq.org/docs/" +homepage = "https://python-rq.org/" +repository = "https://github.com/rq/rq/" +[project.scripts] +rq = "rq.cli:main" +rqinfo = "rq.cli:info" # TODO [v2]: Remove +rqworker = "rq.cli:worker" # TODO [v2]: Remove + +[tool.hatch.version] +path = "rq/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/docs", + "/rq", + "/tests", + "CHANGES.md", + "LICENSE", + "pyproject.toml", + "README.md", + "requirements.txt", + "tox.ini", +] + +[tool.hatch.envs.test] +dependencies = [ + "black", + "coverage", + "packaging", + "psutil", + "pytest", + "pytest-cov", + "ruff", + "sentry-sdk", + "tox", +] +[tool.hatch.envs.test.scripts] +cov = "pytest --cov=rq --cov-config=.coveragerc --cov-report=xml {args:tests}" + [tool.black] line-length = 120 -target-version = ['py38'] +target-version = ["py38"] skip-string-normalization = true [tool.ruff] @@ -13,7 +104,7 @@ select = [ "W", # pycodestyle warnings ] line-length = 120 # To match black. -target-version = 'py38' +target-version = "py38" [tool.ruff.isort] known-first-party = ["rq"] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 3a14dffe..00000000 --- a/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -redis>=4.0.0 -click>=5.0.0 diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index f873f9b9..00000000 --- a/setup.cfg +++ /dev/null @@ -1,6 +0,0 @@ -[bdist_rpm] -requires = redis >= 3.0.0 - click >= 3.0 - -[wheel] -universal = 1 diff --git a/setup.py b/setup.py deleted file mode 100644 index ceaf034d..00000000 --- a/setup.py +++ /dev/null @@ -1,88 +0,0 @@ -""" -rq is a simple, lightweight, library for creating background jobs, and -processing them. -""" -import os - -from setuptools import find_packages, setup - - -def get_version(): - basedir = os.path.dirname(__file__) - try: - with open(os.path.join(basedir, 'rq/version.py')) as f: - locals = {} - exec(f.read(), locals) - return locals['VERSION'] - except FileNotFoundError: - raise RuntimeError('No version info found.') - - -def get_requirements(): - basedir = os.path.dirname(__file__) - try: - with open(os.path.join(basedir, 'requirements.txt')) as f: - return f.readlines() - except FileNotFoundError: - raise RuntimeError('No requirements info found.') - - -setup( - name='rq', - version=get_version(), - url='https://github.com/nvie/rq/', - license='BSD', - author='Vincent Driessen', - author_email='vincent@3rdcloud.com', - description='RQ is a simple, lightweight, library for creating background jobs, and processing them.', - long_description=__doc__, - packages=find_packages(exclude=['tests', 'tests.*']), - package_data={"rq": ["py.typed"]}, - include_package_data=True, - zip_safe=False, - platforms='any', - install_requires=get_requirements(), - python_requires='>=3.6', - entry_points={ - 'console_scripts': [ - 'rq = rq.cli:main', - # NOTE: rqworker/rqinfo are kept for backward-compatibility, - # remove eventually (TODO) - 'rqinfo = rq.cli:info', - 'rqworker = rq.cli:worker', - ], - }, - classifiers=[ - # As from http://pypi.python.org/pypi?%3Aaction=list_classifiers - # 'Development Status :: 1 - Planning', - # 'Development Status :: 2 - Pre-Alpha', - # 'Development Status :: 3 - Alpha', - # 'Development Status :: 4 - Beta', - 'Development Status :: 5 - Production/Stable', - # 'Development Status :: 6 - Mature', - # 'Development Status :: 7 - Inactive', - 'Intended Audience :: Developers', - 'Intended Audience :: End Users/Desktop', - 'Intended Audience :: Information Technology', - 'Intended Audience :: Science/Research', - 'Intended Audience :: System Administrators', - 'License :: OSI Approved :: BSD License', - 'Operating System :: POSIX', - 'Operating System :: MacOS', - 'Operating System :: Unix', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', - 'Programming Language :: Python :: 3.11', - 'Topic :: Software Development :: Libraries :: Python Modules', - 'Topic :: Internet', - 'Topic :: Scientific/Engineering', - 'Topic :: System :: Distributed Computing', - 'Topic :: System :: Systems Administration', - 'Topic :: System :: Monitoring', - ], -) diff --git a/tests/Dockerfile b/tests/Dockerfile index d131b7e8..77408855 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -41,11 +41,8 @@ WORKDIR /tmp/rq RUN set -e && \ python3 -m pip install --upgrade pip && \ - python3 -m pip install --no-cache-dir tox && \ - pip3 install -r /tmp/rq/requirements.txt -r /tmp/rq/dev-requirements.txt && \ - python3 /tmp/rq/setup.py build && \ - python3 /tmp/rq/setup.py install + python3 -m pip install --no-cache-dir hatch tox CMD stunnel \ & redis-server \ - & RUN_SLOW_TESTS_TOO=1 RUN_SSL_TESTS=1 tox + & RUN_SLOW_TESTS_TOO=1 RUN_SSL_TESTS=1 hatch run tox diff --git a/tox.ini b/tox.ini index a180d777..e89dcf18 100644 --- a/tox.ini +++ b/tox.ini @@ -4,11 +4,11 @@ envlist=py36,py37,py38,py39,py310 [testenv] commands=pytest --cov rq --cov-config=.coveragerc --durations=5 {posargs} deps= + codecov + psutil pytest pytest-cov sentry-sdk - codecov - psutil passenv= RUN_SSL_TESTS From 6ce1cc63b97b8ab017f18276d903de67cf45b65c Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 24 Feb 2024 10:10:21 +0700 Subject: [PATCH 08/14] Bump version to 1.16.0 --- CHANGES.md | 7 +++++++ docs/docs/results.md | 3 ++- pyproject.toml | 1 + rq/version.py | 2 +- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0fe4198f..c1b5ec30 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,10 @@ +### RQ 1.16 (2024-02-24) +* Added a way for jobs to wait for latest result `job.latest_result(timeout=60)`. Thanks @ajnisbet! +* Fixed an issue where `stopped_callback` is not respected when job is enqueued via `enqueue_many()`. Thanks @eswolinsky3241! +* `worker-pool` no longer ignores `--quiet`. Thanks @Mindiell! +* Added compatibility with AWS Serverless Redis. Thanks @peter-gy! +* `worker-pool` now starts with scheduler. Thanks @chromium7! + ### RQ 1.15.1 (2023-06-20) * Fixed a bug that may cause a crash when cleaning intermediate queue. Thanks @selwin! * Fixed a bug that may cause canceled jobs to still run dependent jobs. Thanks @fredsod! diff --git a/docs/docs/results.md b/docs/docs/results.md index 2a2c6d46..80741fe8 100644 --- a/docs/docs/results.md +++ b/docs/docs/results.md @@ -157,9 +157,10 @@ for result in job.results(): print(result.created_at, result.type) ``` +_New in version 1.16.0._ To block until a result arrives, you can pass a timeout in seconds to `job.latest_result()`. If any results already exist, the latest result is returned immediately. If the timeout is reached without a result arriving, a `None` object is returned. ```python job = queue.enqueue(sleep_for_10_seconds) -result = job.fetch_latest(timeout=60) # Will hang for about 10 seconds. +result = job.latest_result(timeout=60) # Will hang for about 10 seconds. ``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 7bb68c0f..30509d67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ classifiers = [ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Topic :: Internet", "Topic :: Scientific/Engineering", "Topic :: Software Development :: Libraries :: Python Modules", diff --git a/rq/version.py b/rq/version.py index 3e4fa760..a51469e2 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '1.15.1' +VERSION = '1.16.0' From 97b2d83164e5826ab29477cf71db5115a4108b27 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 24 Feb 2024 10:19:23 +0700 Subject: [PATCH 09/14] Minor test case change to trigger Github Actions --- tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 8789a87c..70df393d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1245,7 +1245,7 @@ class TestWorker(RQTestCase): def test_request_force_stop_ignores_consecutive_signals(self): """Ignore signals sent within 1 second of the last signal""" - queue = Queue(connection=self.testconn) + queue = Queue(connection=self.connection) worker = Worker([queue]) worker._horse_pid = 1 worker._shutdown_requested_date = utcnow() From e50b8f323d5325d8225a105dd15698864609610e Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 24 Feb 2024 10:24:33 +0700 Subject: [PATCH 10/14] Always run github actions on push --- .github/workflows/workflow.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 8a3e532a..20eb3811 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -1,8 +1,6 @@ name: Test on: - push: - branches: [ master ] pull_request: branches: [ master ] From 6ca0a299c80c0eabe62031110b036f2cd2e79f11 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 24 Feb 2024 10:29:23 +0700 Subject: [PATCH 11/14] Run on push to all branches --- .github/workflows/workflow.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 20eb3811..254bfecb 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -1,6 +1,8 @@ name: Test on: + push: + branches: [ ** ] pull_request: branches: [ master ] From b8d2750c175e0a5b904c4698442646d628bf56f4 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 24 Feb 2024 10:30:42 +0700 Subject: [PATCH 12/14] Workflow syntax --- .github/workflows/workflow.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 254bfecb..09a61b92 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -2,7 +2,9 @@ name: Test on: push: - branches: [ ** ] + branches: + - master + - ** pull_request: branches: [ master ] From e98509434e40a58c9a6362bd5a6175a9471e4ebe Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 24 Feb 2024 10:31:33 +0700 Subject: [PATCH 13/14] Workflow syntax 2 --- .github/workflows/workflow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 09a61b92..f7553407 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -4,7 +4,7 @@ on: push: branches: - master - - ** + - '**' pull_request: branches: [ master ] From 34f83d637fea14e5afbae47305f80004d6ba6432 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 24 Feb 2024 11:50:40 +0700 Subject: [PATCH 14/14] Remove Python 3.6 from test matrix --- .github/workflows/workflow.yml | 43 ++-------------------------------- 1 file changed, 2 insertions(+), 41 deletions(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index f7553407..c9aeba0d 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -35,7 +35,7 @@ jobs: timeout-minutes: 10 strategy: matrix: - python-version: ["3.7", "3.8.3", "3.9", "3.10", "3.11"] + python-version: ["3.7", "3.8.3", "3.9", "3.10", "3.11", "3.12"] redis-version: [3, 4, 5, 6, 7] redis-py-version: [3.5.0] @@ -66,43 +66,4 @@ jobs: uses: codecov/codecov-action@v3 with: file: ./coverage.xml - fail_ci_if_error: false - test-python-36: - name: Python${{ matrix.python-version }}/Redis${{ matrix.redis-version }}/redis-py${{ matrix.redis-py-version }} - runs-on: ubuntu-20.04 - timeout-minutes: 10 - strategy: - matrix: - python-version: ["3.6"] - redis-version: [3, 4, 5, 6, 7] - redis-py-version: [3.5.0] - - steps: - - uses: actions/checkout@v3 - - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4.6.1 - with: - python-version: ${{ matrix.python-version }} - - - name: Start Redis - uses: supercharge/redis-github-action@1.5.0 - with: - redis-version: ${{ matrix.redis-version }} - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install redis==${{ matrix.redis-py-version }} - pip install -r requirements.txt -r dev-requirements-36.txt - pip install -e . - - - name: Test with pytest - run: | - RUN_SLOW_TESTS_TOO=1 pytest --cov=rq --cov-config=.coveragerc --cov-report=xml --durations=5 - - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 - with: - file: ./coverage.xml - fail_ci_if_error: false + fail_ci_if_error: false \ No newline at end of file