Added `desc` argument to registry.get_job_ids() (#2129)

* Added `desc` argument to registry.get_job_ids()

* Added argument description to registry.get_job_ids()
This commit is contained in:
Selwin Ong 2024-10-13 07:08:54 +07:00 committed by GitHub
parent fda862e600
commit fc9610ae53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 7 additions and 6 deletions

View File

@ -153,18 +153,19 @@ class BaseRegistry:
expired_jobs = self.connection.zrangebyscore(self.key, 0, score) expired_jobs = self.connection.zrangebyscore(self.key, 0, score)
return [as_text(job_id) for job_id in expired_jobs] return [as_text(job_id) for job_id in expired_jobs]
def get_job_ids(self, start: int = 0, end: int = -1): def get_job_ids(self, start: int = 0, end: int = -1, desc: bool = False):
"""Returns list of all job ids. """Returns list of all job ids.
Args: Args:
start (int, optional): _description_. Defaults to 0. start (int, optional): _description_. Defaults to 0.
end (int, optional): _description_. Defaults to -1. end (int, optional): _description_. Defaults to -1.
end (bool, optional): _description_. Defaults to False.
Returns: Returns:
_type_: _description_ _type_: _description_
""" """
self.cleanup() self.cleanup()
return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)] return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end, desc=desc)]
def get_queue(self): def get_queue(self):
"""Returns Queue object associated with this registry.""" """Returns Queue object associated with this registry."""
@ -207,7 +208,7 @@ class BaseRegistry:
queue = Queue(job.origin, connection=self.connection, job_class=self.job_class, serializer=serializer) queue = Queue(job.origin, connection=self.connection, job_class=self.job_class, serializer=serializer)
job.started_at = None job.started_at = None
job.ended_at = None job.ended_at = None
job._exc_info = '' job._exc_info = '' # TODO: this should be removed
job.save() job.save()
job = queue._enqueue_job(job, pipeline=pipeline, at_front=at_front) job = queue._enqueue_job(job, pipeline=pipeline, at_front=at_front)
pipeline.execute() pipeline.execute()

View File

@ -612,7 +612,7 @@ class BaseWorker:
def cleanup_execution(self, job: 'Job', pipeline: 'Pipeline'): def cleanup_execution(self, job: 'Job', pipeline: 'Pipeline'):
"""Cleans up the execution of a job. """Cleans up the execution of a job.
It will remove the job from the `StartedJobRegistry` and deleting the Execution object. It will remove the job from the `StartedJobRegistry` and delete the Execution object.
""" """
started_job_registry = StartedJobRegistry( started_job_registry = StartedJobRegistry(
job.origin, self.connection, job_class=self.job_class, serializer=self.serializer job.origin, self.connection, job_class=self.job_class, serializer=self.serializer
@ -1548,9 +1548,9 @@ class Worker(BaseWorker):
job.started_at = now() job.started_at = now()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
self.log.debug('Performing Job...') self.log.debug('Performing Job %s ...', job.id)
rv = job.perform() rv = job.perform()
self.log.debug('Finished performing Job ID %s', job.id) self.log.debug('Finished performing Job %s', job.id)
self.handle_execution_ended(job, queue, job.success_callback_timeout) self.handle_execution_ended(job, queue, job.success_callback_timeout)
# Pickle the result in the same try-except block since we need # Pickle the result in the same try-except block since we need