diff --git a/rq/worker.py b/rq/worker.py index 4475d498..0eaae0a1 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -679,6 +679,66 @@ class Worker: """ pass + def bootstrap( + self, + logging_level: str = "INFO", + date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT, + ): + """Bootstraps the worker. + Runs the basic tasks that should run when the worker actually starts working. + Used so that new workers can focus on the work loop implementation rather + than the full bootstraping process. + + Args: + logging_level (str, optional): Logging level to use. Defaults to "INFO". + date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. + log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. + """ + setup_loghandlers(logging_level, date_format, log_format) + self.register_birth() + self.log.info("Worker %s: started, version %s", self.key, VERSION) + self.subscribe() + self.set_state(WorkerStatus.STARTED) + qnames = self.queue_names() + self.log.info('*** Listening on %s...', green(', '.join(qnames))) + + def _start_scheduler( + self, + burst: bool = False, + logging_level: str = "INFO", + date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT, + ): + """Starts the scheduler process. + This is specifically designed to be run by the worker when running the `work()` method. + Instanciates the RQScheduler and tries to acquire a lock. + If the lock is acquired, start scheduler. + If worker is on burst mode just enqueues scheduled jobs and quits, + otherwise, starts the scheduler in a separate process. + + Args: + burst (bool, optional): Whether to work on burst mode. Defaults to False. + logging_level (str, optional): Logging level to use. Defaults to "INFO". + date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. + log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. + """ + self.scheduler = RQScheduler( + self.queues, + connection=self.connection, + logging_level=logging_level, + date_format=date_format, + log_format=log_format, + serializer=self.serializer, + ) + self.scheduler.acquire_locks() + if self.scheduler.acquired_locks: + if burst: + self.scheduler.enqueue_scheduled_jobs() + self.scheduler.release_locks() + else: + self.scheduler.start() + def work( self, burst: bool = False, @@ -707,34 +767,10 @@ class Worker: Returns: worked (bool): Will return True if any job was processed, False otherwise. """ - setup_loghandlers(logging_level, date_format, log_format) + self.bootstrap(logging_level, date_format, log_format) completed_jobs = 0 - self.register_birth() - self.log.info("Worker %s: started, version %s", self.key, VERSION) - self.subscribe() - self.set_state(WorkerStatus.STARTED) - qnames = self.queue_names() - self.log.info('*** Listening on %s...', green(', '.join(qnames))) - if with_scheduler: - self.scheduler = RQScheduler( - self.queues, - connection=self.connection, - logging_level=logging_level, - date_format=date_format, - log_format=log_format, - serializer=self.serializer, - ) - self.scheduler.acquire_locks() - # If lock is acquired, start scheduler - if self.scheduler.acquired_locks: - # If worker is run on burst mode, enqueue_scheduled_jobs() - # before working. Otherwise, start scheduler in a separate process - if burst: - self.scheduler.enqueue_scheduled_jobs() - self.scheduler.release_locks() - else: - self.scheduler.start() + self._start_scheduler(burst, logging_level, date_format, log_format) self._install_signal_handlers() try: