From 87da060648c5f650a0d412f130023aafd8c147d3 Mon Sep 17 00:00:00 2001 From: Andrew Nisbet Date: Tue, 20 Jun 2023 18:53:42 -0700 Subject: [PATCH] Add result blocking (#1939) * Add result blocking * Dev tidyup * Skip XREAD test on old redis versions * Lint * Clarify that the latest result is returned. * Fix job test ordering * Remove test latency hack * Readability improvements --- docs/docs/results.md | 7 +++++++ rq/job.py | 10 ++++++---- rq/results.py | 33 ++++++++++++++++++++++++--------- tests/test_job.py | 22 ++++++++++++++++++++-- tests/test_results.py | 28 ++++++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 15 deletions(-) diff --git a/docs/docs/results.md b/docs/docs/results.md index b2a054ae..2a2c6d46 100644 --- a/docs/docs/results.md +++ b/docs/docs/results.md @@ -156,3 +156,10 @@ job = Job.fetch(id='my_id', connection=redis) for result in job.results(): print(result.created_at, result.type) ``` + +To block until a result arrives, you can pass a timeout in seconds to `job.latest_result()`. If any results already exist, the latest result is returned immediately. If the timeout is reached without a result arriving, a `None` object is returned. + +```python +job = queue.enqueue(sleep_for_10_seconds) +result = job.fetch_latest(timeout=60) # Will hang for about 10 seconds. +``` \ No newline at end of file diff --git a/rq/job.py b/rq/job.py index f2707b56..c7bc7a7a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -807,7 +807,7 @@ class Job: return self._exc_info def return_value(self, refresh: bool = False) -> Optional[Any]: - """Returns the return value of the latest execution, if it was successful + """Returns the return value of the latest execution, if it was successful. Args: refresh (bool, optional): Whether to refresh the current status. Defaults to False. @@ -886,16 +886,18 @@ class Job: return Result.all(self, serializer=self.serializer) - def latest_result(self) -> Optional['Result']: + def latest_result(self, timeout: int = 0) -> Optional['Result']: """Get the latest job result. + Args: + timeout (int, optional): Number of seconds to block waiting for a result. Defaults to 0 (no blocking). + Returns: result (Result): The Result object """ - """Returns the latest Result object""" from .results import Result - return Result.fetch_latest(self, serializer=self.serializer) + return Result.fetch_latest(self, serializer=self.serializer, timeout=timeout) def restore(self, raw_data) -> Any: """Overwrite properties with the provided values stored in Redis. diff --git a/rq/results.py b/rq/results.py index 27bab15a..a5923880 100644 --- a/rq/results.py +++ b/rq/results.py @@ -143,16 +143,31 @@ class Result: return None @classmethod - def fetch_latest(cls, job: Job, serializer=None) -> Optional['Result']: - """Returns the latest result for given job instance or ID""" - # response = job.connection.zrevrangebyscore(cls.get_key(job.id), '+inf', '-inf', - # start=0, num=1, withscores=True) - response = job.connection.xrevrange(cls.get_key(job.id), '+', '-', count=1) - if not response: - return None + def fetch_latest(cls, job: Job, serializer=None, timeout: int = 0) -> Optional['Result']: + """Returns the latest result for given job instance or ID. - result_id, payload = response[0] - return cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) + If a non-zero timeout is provided, block for a result until timeout is reached. + """ + if timeout: + # Unlike blpop, xread timeout is in miliseconds. "0-0" is the special value for the + # first item in the stream, like '-' for xrevrange. + timeout_ms = timeout * 1000 + response = job.connection.xread({cls.get_key(job.id): "0-0"}, block=timeout_ms) + if not response: + return None + response = response[0] # Querying single stream only. + response = response[1] # Xread also returns Result.id, which we don't need. + result_id, payload = response[-1] # Take most recent result. + + else: + # If not blocking, use xrevrange to load a single result (as xread will load them all). + response = job.connection.xrevrange(cls.get_key(job.id), '+', '-', count=1) + if not response: + return None + result_id, payload = response[0] + + res = cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) + return res @classmethod def get_key(cls, job_id): diff --git a/tests/test_job.py b/tests/test_job.py index 29c309f2..83f9a9d9 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,11 +1,12 @@ import json import queue import time +import unittest import zlib from datetime import datetime, timedelta from pickle import dumps, loads -from redis import WatchError +from redis import Redis, WatchError from rq.defaults import CALLBACK_TIMEOUT from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError @@ -20,7 +21,7 @@ from rq.registry import ( StartedJobRegistry, ) from rq.serializers import JSONSerializer -from rq.utils import as_text, utcformat, utcnow +from rq.utils import as_text, get_version, utcformat, utcnow from rq.worker import Worker from tests import RQTestCase, fixtures @@ -1219,3 +1220,20 @@ class TestJob(RQTestCase): self.assertEqual(queue.count, 0) self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B])) self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"]) + + @unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0') + def test_blocking_result_fetch(self): + # Ensure blocking waits for the time to run the job, but not right up until the timeout. + job_sleep_seconds = 2 + block_seconds = 5 + queue_name = "test_blocking_queue" + q = Queue(queue_name) + job = q.enqueue(fixtures.long_running_job, job_sleep_seconds) + started_at = time.time() + fixtures.start_worker_process(queue_name, burst=True) + result = job.latest_result(timeout=block_seconds) + blocked_for = time.time() - started_at + self.assertEqual(job.get_status(), JobStatus.FINISHED) + self.assertIsNotNone(result) + self.assertGreaterEqual(blocked_for, job_sleep_seconds) + self.assertLess(blocked_for, block_seconds) diff --git a/tests/test_results.py b/tests/test_results.py index e27e872a..7d188c5c 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -1,4 +1,5 @@ import tempfile +import time import unittest from datetime import timedelta from unittest.mock import PropertyMock, patch @@ -249,3 +250,30 @@ class TestScheduledJobRegistry(RQTestCase): Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) self.assertEqual(Result.count(job), 2) + + def test_blocking_results(self): + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + + # Should block if there's no result. + timeout = 1 + self.assertIsNone(Result.fetch_latest(job)) + started_at = time.time() + self.assertIsNone(Result.fetch_latest(job, timeout=timeout)) + blocked_for = time.time() - started_at + self.assertGreaterEqual(blocked_for, timeout) + + # Shouldn't block if there's already a result present. + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) + timeout = 1 + result_sync = Result.fetch_latest(job) + started_at = time.time() + result_blocking = Result.fetch_latest(job, timeout=timeout) + blocked_for = time.time() - started_at + self.assertEqual(result_sync.return_value, result_blocking.return_value) + self.assertGreater(timeout, blocked_for) + + # Should return the latest result if there are multiple. + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=2) + result_blocking = Result.fetch_latest(job, timeout=1) + self.assertEqual(result_blocking.return_value, 2)