mirror of https://github.com/rq/rq.git
Ensure the Redis socket timeout is long enough for blocking operations (#2120)
This change checks for Redis connection socket timeouts that are too short for operations such as BLPOP, and adjusts them to at least be the expected timeout.
This commit is contained in:
parent
d5e55ab95a
commit
ccdff1f003
|
@ -421,15 +421,16 @@ class BaseWorker:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _set_connection(self, connection: 'Redis') -> 'Redis':
|
def _set_connection(self, connection: 'Redis') -> 'Redis':
|
||||||
"""Configures the Redis connection to have a socket timeout.
|
"""Configures the Redis connection's socket timeout.
|
||||||
This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP).
|
This will timeout the connection in case any specific command hangs at any given time (eg. BLPOP), but
|
||||||
If the connection provided already has a `socket_timeout` defined, skips.
|
also ensures that the timeout is long enough for those operations.
|
||||||
|
If the connection provided already has an adequate `socket_timeout` defined, skips.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
connection (Optional[Redis]): The Redis Connection.
|
connection (Optional[Redis]): The Redis Connection.
|
||||||
"""
|
"""
|
||||||
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
|
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
|
||||||
if current_socket_timeout is None:
|
if current_socket_timeout is None or current_socket_timeout < self.connection_timeout:
|
||||||
timeout_config = {"socket_timeout": self.connection_timeout}
|
timeout_config = {"socket_timeout": self.connection_timeout}
|
||||||
connection.connection_pool.connection_kwargs.update(timeout_config)
|
connection.connection_pool.connection_kwargs.update(timeout_config)
|
||||||
return connection
|
return connection
|
||||||
|
|
|
@ -694,10 +694,11 @@ class TestWorker(RQTestCase):
|
||||||
self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2))
|
self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2))
|
||||||
self.assertLess((now() - right_now).total_seconds(), 4) # 4 for some buffer
|
self.assertLess((now() - right_now).total_seconds(), 4) # 4 for some buffer
|
||||||
|
|
||||||
# idle for 3 seconds because idle_time is less than two rounds of timeout
|
|
||||||
right_now = now()
|
|
||||||
w = Worker([q])
|
w = Worker([q])
|
||||||
w.worker_ttl = 2
|
w.worker_ttl = 2
|
||||||
|
right_now = now()
|
||||||
|
|
||||||
|
# idle for 3 seconds because idle_time is less than two rounds of timeout
|
||||||
w.work(max_idle_time=3)
|
w.work(max_idle_time=3)
|
||||||
self.assertLess((now() - right_now).total_seconds(), 5) # 5 for some buffer
|
self.assertLess((now() - right_now).total_seconds(), 5) # 5 for some buffer
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue