Added `worker.wait_for_procesing_unit()`.

This commit is contained in:
Selwin Ong 2023-03-23 11:20:06 +07:00
parent 04722339d7
commit 8abdc62d23
2 changed files with 139 additions and 105 deletions

View File

@ -71,8 +71,11 @@ class Queue:
@classmethod
def all(
cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None,
serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None
cls,
connection: Optional['Redis'] = None,
job_class: Optional[Type['Job']] = None,
serializer=None,
death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> List['Queue']:
"""Returns an iterable of all Queues.
@ -89,8 +92,11 @@ class Queue:
def to_queue(queue_key):
return cls.from_queue_key(
as_text(queue_key), connection=connection, job_class=job_class,
serializer=serializer, death_penalty_class=death_penalty_class
as_text(queue_key),
connection=connection,
job_class=job_class,
serializer=serializer,
death_penalty_class=death_penalty_class,
)
all_registerd_queues = connection.smembers(cls.redis_queues_keys)
@ -99,12 +105,12 @@ class Queue:
@classmethod
def from_queue_key(
cls,
queue_key: str,
connection: Optional['Redis'] = None,
job_class: Optional[Type['Job']] = None,
serializer: Any = None,
death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
cls,
queue_key: str,
connection: Optional['Redis'] = None,
job_class: Optional[Type['Job']] = None,
serializer: Any = None,
death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
@ -126,20 +132,25 @@ class Queue:
prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
name = queue_key[len(prefix):]
return cls(name, connection=connection, job_class=job_class, serializer=serializer,
death_penalty_class=death_penalty_class)
name = queue_key[len(prefix) :]
return cls(
name,
connection=connection,
job_class=job_class,
serializer=serializer,
death_penalty_class=death_penalty_class,
)
def __init__(
self,
name: str = 'default',
default_timeout: Optional[int] = None,
connection: Optional['Redis'] = None,
is_async: bool = True,
job_class: Union[str, Type['Job'], None] = None,
serializer: Any = None,
death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
**kwargs,
self,
name: str = 'default',
default_timeout: Optional[int] = None,
connection: Optional['Redis'] = None,
is_async: bool = True,
job_class: Union[str, Type['Job'], None] = None,
serializer: Any = None,
death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
**kwargs,
):
"""Initializes a Queue object.
@ -207,6 +218,7 @@ class Queue:
@property
def scheduler_pid(self) -> int:
from rq.scheduler import RQScheduler
pid = self.connection.get(RQScheduler.get_locking_key(self.name))
return int(pid.decode()) if pid is not None else None
@ -467,23 +479,23 @@ class Queue:
self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result)
def create_job(
self,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
meta: Optional[Dict] = None,
status: JobStatus = JobStatus.QUEUED,
retry: Optional['Retry'] = None,
*,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
self,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
meta: Optional[Dict] = None,
status: JobStatus = JobStatus.QUEUED,
retry: Optional['Retry'] = None,
*,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
) -> Job:
"""Creates a job based on parameters given
@ -609,23 +621,23 @@ class Queue:
return job
def enqueue_call(
self,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable[..., Any]] = None,
on_failure: Optional[Callable[..., Any]] = None,
pipeline: Optional['Pipeline'] = None,
self,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable[..., Any]] = None,
on_failure: Optional[Callable[..., Any]] = None,
pipeline: Optional['Pipeline'] = None,
) -> Job:
"""Creates a job to represent the delayed function call and enqueues it.
@ -676,20 +688,20 @@ class Queue:
@staticmethod
def prepare_data(
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
) -> EnqueueData:
"""Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
And can keep this logic within EnqueueData
@ -1001,7 +1013,6 @@ class Queue:
return self._enqueue_job(job, pipeline=pipeline, at_front=at_front)
return job
def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution without checking dependencies.
@ -1071,7 +1082,7 @@ class Queue:
return job
def enqueue_dependents(
self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
):
"""Enqueues all jobs in the given job's dependents set and clears it.
@ -1108,7 +1119,7 @@ class Queue:
dependent_job_ids, connection=self.connection, serializer=self.serializer
)
if dependent_job
and dependent_job.dependencies_are_met(
and dependent_job.dependencies_are_met(
parent_job=job,
pipeline=pipe,
exclude_job_id=exclude_job_id,
@ -1208,13 +1219,13 @@ class Queue:
@classmethod
def dequeue_any(
cls,
queues: List['Queue'],
timeout: Optional[int],
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
cls,
queues: List['Queue'],
timeout: Optional[int],
connection: Optional['Redis'] = None,
job_class: Optional[Type['Job']] = None,
serializer: Any = None,
death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.
@ -1248,8 +1259,13 @@ class Queue:
if result is None:
return None
queue_key, job_id = map(as_text, result)
queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class,
serializer=serializer, death_penalty_class=death_penalty_class)
queue = cls.from_queue_key(
queue_key,
connection=connection,
job_class=job_class,
serializer=serializer,
death_penalty_class=death_penalty_class,
)
try:
job = job_class.fetch(job_id, connection=connection, serializer=serializer)
except NoSuchJobError:

View File

@ -13,8 +13,7 @@ import warnings
from datetime import timedelta
from enum import Enum
from random import shuffle
from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type,
Union)
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, Union
from uuid import uuid4
if TYPE_CHECKING:
@ -57,7 +56,17 @@ from .scheduler import RQScheduler
from .serializers import resolve_serializer
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text
from .utils import (
backend_class,
ensure_list,
get_version,
make_colorizer,
utcformat,
utcnow,
utcparse,
compact,
as_text,
)
from .version import VERSION
from .serializers import resolve_serializer
@ -249,7 +258,7 @@ class Worker:
disable_default_exception_handler: bool = False,
prepare_for_work: bool = True,
serializer=None,
work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None
work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None,
): # noqa
self.default_result_ttl = default_result_ttl
self.worker_ttl = default_worker_ttl
@ -267,8 +276,13 @@ class Worker:
self.serializer = resolve_serializer(serializer)
queues = [
self.queue_class(name=q, connection=connection, job_class=self.job_class,
serializer=self.serializer, death_penalty_class=self.death_penalty_class,)
self.queue_class(
name=q,
connection=connection,
job_class=self.job_class,
serializer=self.serializer,
death_penalty_class=self.death_penalty_class,
)
if isinstance(q, str)
else q
for q in ensure_list(queues)
@ -706,7 +720,7 @@ class Worker:
return
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
pos = self._ordered_queues.index(reference_queue)
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
return
if self._dequeue_strategy == DequeueStrategy.RANDOM:
shuffle(self._ordered_queues)
@ -716,7 +730,7 @@ class Worker:
self,
logging_level: str = "INFO",
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT
log_format: str = DEFAULT_LOGGING_FORMAT,
):
"""Bootstraps the worker.
Runs the basic tasks that should run when the worker actually starts working.
@ -772,6 +786,12 @@ class Worker:
else:
self.scheduler.start()
def wait_for_processing_unit(self):
"""Wait for a processing unit to be available.
This is used to limit the number of jobs that can be processed at the same time.
"""
return
def work(
self,
burst: bool = False,
@ -781,7 +801,7 @@ class Worker:
max_jobs: Optional[int] = None,
max_idle_time: Optional[int] = None,
with_scheduler: bool = False,
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
) -> bool:
"""Starts the work loop.
@ -814,6 +834,12 @@ class Worker:
self._install_signal_handlers()
try:
while True:
self.wait_for_processing_unit()
if max_jobs is not None:
if completed_jobs >= max_jobs:
self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs)
break
try:
self.check_for_suspension(burst)
@ -836,12 +862,7 @@ class Worker:
job, queue = result
self.execute_job(job, queue)
self.heartbeat()
completed_jobs += 1
if max_jobs is not None:
if completed_jobs >= max_jobs:
self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs)
break
except redis.exceptions.TimeoutError:
self.log.error('Worker %s: Redis connection timeout, quitting...', self.key)
@ -881,7 +902,9 @@ class Worker:
pass
self.scheduler._process.join()
def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']:
def dequeue_job_and_maintain_ttl(
self, timeout: Optional[int], max_idle_time: Optional[int] = None
) -> Tuple['Job', 'Queue']:
"""Dequeues a job while maintaining the TTL.
Returns:
@ -1167,10 +1190,7 @@ class Worker:
self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string)
self.handle_work_horse_killed(job, retpid, ret_val, rusage)
self.handle_job_failure(
job, queue=queue,
exc_string=exc_string
)
self.handle_job_failure(job, queue=queue, exc_string=exc_string)
def execute_job(self, job: 'Job', queue: 'Queue'):
"""Spawns a work horse to perform the actual work and passes it a job.
@ -1458,9 +1478,7 @@ class Worker:
extra.update({'queue': job.origin, 'job_id': job.id})
# func_name
self.log.error(
'[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra
)
self.log.error('[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra)
for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler)
@ -1598,7 +1616,7 @@ class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
class RandomWorker(Worker):