Add result blocking (#1939)

* Add result blocking

* Dev tidyup

* Skip XREAD test on old redis versions

* Lint

* Clarify that the latest result is returned.

* Fix job test ordering

* Remove test latency hack

* Readability improvements
This commit is contained in:
Andrew Nisbet 2023-06-20 18:53:42 -07:00 committed by Selwin Ong
parent f15cad4501
commit 87da060648
5 changed files with 85 additions and 15 deletions

View File

@ -156,3 +156,10 @@ job = Job.fetch(id='my_id', connection=redis)
for result in job.results():
print(result.created_at, result.type)
```
To block until a result arrives, you can pass a timeout in seconds to `job.latest_result()`. If any results already exist, the latest result is returned immediately. If the timeout is reached without a result arriving, a `None` object is returned.
```python
job = queue.enqueue(sleep_for_10_seconds)
result = job.fetch_latest(timeout=60) # Will hang for about 10 seconds.
```

View File

@ -807,7 +807,7 @@ class Job:
return self._exc_info
def return_value(self, refresh: bool = False) -> Optional[Any]:
"""Returns the return value of the latest execution, if it was successful
"""Returns the return value of the latest execution, if it was successful.
Args:
refresh (bool, optional): Whether to refresh the current status. Defaults to False.
@ -886,16 +886,18 @@ class Job:
return Result.all(self, serializer=self.serializer)
def latest_result(self) -> Optional['Result']:
def latest_result(self, timeout: int = 0) -> Optional['Result']:
"""Get the latest job result.
Args:
timeout (int, optional): Number of seconds to block waiting for a result. Defaults to 0 (no blocking).
Returns:
result (Result): The Result object
"""
"""Returns the latest Result object"""
from .results import Result
return Result.fetch_latest(self, serializer=self.serializer)
return Result.fetch_latest(self, serializer=self.serializer, timeout=timeout)
def restore(self, raw_data) -> Any:
"""Overwrite properties with the provided values stored in Redis.

View File

@ -143,16 +143,31 @@ class Result:
return None
@classmethod
def fetch_latest(cls, job: Job, serializer=None) -> Optional['Result']:
"""Returns the latest result for given job instance or ID"""
# response = job.connection.zrevrangebyscore(cls.get_key(job.id), '+inf', '-inf',
# start=0, num=1, withscores=True)
response = job.connection.xrevrange(cls.get_key(job.id), '+', '-', count=1)
if not response:
return None
def fetch_latest(cls, job: Job, serializer=None, timeout: int = 0) -> Optional['Result']:
"""Returns the latest result for given job instance or ID.
result_id, payload = response[0]
return cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer)
If a non-zero timeout is provided, block for a result until timeout is reached.
"""
if timeout:
# Unlike blpop, xread timeout is in miliseconds. "0-0" is the special value for the
# first item in the stream, like '-' for xrevrange.
timeout_ms = timeout * 1000
response = job.connection.xread({cls.get_key(job.id): "0-0"}, block=timeout_ms)
if not response:
return None
response = response[0] # Querying single stream only.
response = response[1] # Xread also returns Result.id, which we don't need.
result_id, payload = response[-1] # Take most recent result.
else:
# If not blocking, use xrevrange to load a single result (as xread will load them all).
response = job.connection.xrevrange(cls.get_key(job.id), '+', '-', count=1)
if not response:
return None
result_id, payload = response[0]
res = cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer)
return res
@classmethod
def get_key(cls, job_id):

View File

@ -1,11 +1,12 @@
import json
import queue
import time
import unittest
import zlib
from datetime import datetime, timedelta
from pickle import dumps, loads
from redis import WatchError
from redis import Redis, WatchError
from rq.defaults import CALLBACK_TIMEOUT
from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
@ -20,7 +21,7 @@ from rq.registry import (
StartedJobRegistry,
)
from rq.serializers import JSONSerializer
from rq.utils import as_text, utcformat, utcnow
from rq.utils import as_text, get_version, utcformat, utcnow
from rq.worker import Worker
from tests import RQTestCase, fixtures
@ -1219,3 +1220,20 @@ class TestJob(RQTestCase):
self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B]))
self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"])
@unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0')
def test_blocking_result_fetch(self):
# Ensure blocking waits for the time to run the job, but not right up until the timeout.
job_sleep_seconds = 2
block_seconds = 5
queue_name = "test_blocking_queue"
q = Queue(queue_name)
job = q.enqueue(fixtures.long_running_job, job_sleep_seconds)
started_at = time.time()
fixtures.start_worker_process(queue_name, burst=True)
result = job.latest_result(timeout=block_seconds)
blocked_for = time.time() - started_at
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertIsNotNone(result)
self.assertGreaterEqual(blocked_for, job_sleep_seconds)
self.assertLess(blocked_for, block_seconds)

View File

@ -1,4 +1,5 @@
import tempfile
import time
import unittest
from datetime import timedelta
from unittest.mock import PropertyMock, patch
@ -249,3 +250,30 @@ class TestScheduledJobRegistry(RQTestCase):
Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
self.assertEqual(Result.count(job), 2)
def test_blocking_results(self):
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
# Should block if there's no result.
timeout = 1
self.assertIsNone(Result.fetch_latest(job))
started_at = time.time()
self.assertIsNone(Result.fetch_latest(job, timeout=timeout))
blocked_for = time.time() - started_at
self.assertGreaterEqual(blocked_for, timeout)
# Shouldn't block if there's already a result present.
Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
timeout = 1
result_sync = Result.fetch_latest(job)
started_at = time.time()
result_blocking = Result.fetch_latest(job, timeout=timeout)
blocked_for = time.time() - started_at
self.assertEqual(result_sync.return_value, result_blocking.return_value)
self.assertGreater(timeout, blocked_for)
# Should return the latest result if there are multiple.
Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=2)
result_blocking = Result.fetch_latest(job, timeout=1)
self.assertEqual(result_blocking.return_value, 2)