diff --git a/rq/cli/cli.py b/rq/cli/cli.py index ddfa8a32..1bf91516 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -223,6 +223,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') @click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') +@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') @click.argument('queues', nargs=-1) @@ -246,6 +247,7 @@ def worker( pid, disable_default_exception_handler, max_jobs, + max_idle_time, with_scheduler, queues, log_format, @@ -317,6 +319,7 @@ def worker( date_format=date_format, log_format=log_format, max_jobs=max_jobs, + max_idle_time=max_idle_time, with_scheduler=with_scheduler, ) except ConnectionError as e: diff --git a/rq/queue.py b/rq/queue.py index 3c483e8f..45d6a402 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1140,7 +1140,7 @@ class Queue: return as_text(self.connection.lpop(self.key)) @classmethod - def lpop(cls, queue_keys: List[str], timeout: int, connection: Optional['Redis'] = None): + def lpop(cls, queue_keys: List[str], timeout: Optional[int], connection: Optional['Redis'] = None): """Helper method. Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to @@ -1155,7 +1155,7 @@ class Queue: Args: queue_keys (_type_): _description_ - timeout (int): _description_ + timeout (Optional[int]): _description_ connection (Optional[Redis], optional): _description_. Defaults to None. Raises: @@ -1188,7 +1188,7 @@ class Queue: def dequeue_any( cls, queues: List['Queue'], - timeout: int, + timeout: Optional[int], connection: Optional['Redis'] = None, job_class: Optional['Job'] = None, serializer: Any = None, @@ -1205,7 +1205,7 @@ class Queue: Args: queues (List[Queue]): List of queue objects - timeout (int): Timeout for the LPOP + timeout (Optional[int]): Timeout for the LPOP connection (Optional[Redis], optional): Redis Connection. Defaults to None. job_class (Optional[Job], optional): The job classification. Defaults to None. serializer (Any, optional): Serializer to use. Defaults to None. diff --git a/rq/worker.py b/rq/worker.py index 3e95a3fd..2cf4d188 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,6 +1,7 @@ import contextlib import errno import logging +import math import os import random import resource @@ -746,6 +747,7 @@ class Worker: date_format: str = DEFAULT_LOGGING_DATE_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT, max_jobs: Optional[int] = None, + max_idle_time: Optional[int] = None, with_scheduler: bool = False, ) -> bool: """Starts the work loop. @@ -753,6 +755,7 @@ class Worker: Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the queues, unless `burst` mode is enabled. + If `max_idle_time` is provided, worker will die when it's idle for more than the provided value. The return value indicates whether any jobs were processed. @@ -762,6 +765,7 @@ class Worker: date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. + max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None. with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. Returns: @@ -786,10 +790,12 @@ class Worker: break timeout = None if burst else self.dequeue_timeout - result = self.dequeue_job_and_maintain_ttl(timeout) + result = self.dequeue_job_and_maintain_ttl(timeout, max_idle_time) if result is None: if burst: self.log.info("Worker %s: done, quitting", self.key) + elif max_idle_time is not None: + self.log.info("Worker %s: idle for %d seconds, quitting", self.key, max_idle_time) break job, queue = result @@ -841,7 +847,7 @@ class Worker: pass self.scheduler._process.join() - def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']: + def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']: """Dequeues a job while maintaining the TTL. Returns: @@ -854,6 +860,8 @@ class Worker: self.procline('Listening on ' + qnames) self.log.debug('*** Listening on %s...', green(qnames)) connection_wait_time = 1.0 + idle_since = utcnow() + idle_time_left = max_idle_time while True: try: self.heartbeat() @@ -861,6 +869,9 @@ class Worker: if self.should_run_maintenance_tasks: self.run_maintenance_tasks() + if timeout is not None and idle_time_left is not None: + timeout = min(timeout, idle_time_left) + self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}") result = self.queue_class.dequeue_any( self._ordered_queues, @@ -880,7 +891,11 @@ class Worker: break except DequeueTimeout: - pass + if max_idle_time is not None: + idle_for = (utcnow() - idle_since).total_seconds() + idle_time_left = math.ceil(max_idle_time - idle_for) + if idle_time_left <= 0: + break except redis.exceptions.ConnectionError as conn_err: self.log.error( 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time diff --git a/tests/test_worker.py b/tests/test_worker.py index 99f65951..239252ce 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -608,6 +608,31 @@ class TestWorker(RQTestCase): # Should not have created evidence of execution self.assertEqual(os.path.exists(SENTINEL_FILE), False) + @slow + def test_max_idle_time(self): + q = Queue() + w = Worker([q]) + q.enqueue(say_hello, args=('Frank',)) + self.assertIsNotNone(w.dequeue_job_and_maintain_ttl(1)) + + # idle for 1 second + self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=1)) + + # idle for 3 seconds + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=3)) + self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer + + # idle for 2 seconds because idle_time is less than timeout + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2)) + self.assertLess((utcnow()-now).total_seconds(), 4) # 4 for some buffer + + # idle for 3 seconds because idle_time is less than two rounds of timeout + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(2, max_idle_time=3)) + self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer + @slow # noqa def test_timeouts(self): """Worker kills jobs after timeout.""" @@ -640,7 +665,6 @@ class TestWorker(RQTestCase): q = Queue() w = Worker([q]) - # Put it on the queue with a timeout value self.assertIsNone(w.dequeue_job_and_maintain_ttl(None)) def test_worker_ttl_param_resolves_timeout(self):