diff --git a/rq/cli/cli.py b/rq/cli/cli.py old mode 100755 new mode 100644 diff --git a/rq/job.py b/rq/job.py index c7bc7a7a..d1439d68 100644 --- a/rq/job.py +++ b/rq/job.py @@ -254,6 +254,7 @@ class Job: on_success = Callback(on_success) # backward compatibility job._success_callback_name = on_success.name job._success_callback_timeout = on_success.timeout + job._success_callback_params = on_success.params if on_failure: if not isinstance(on_failure, Callback): @@ -637,6 +638,7 @@ class Job: self._args = UNEVALUATED self._kwargs = UNEVALUATED self._success_callback_name = None + self._success_callback_params = None self._success_callback = UNEVALUATED self._failure_callback_name = None self._failure_callback = UNEVALUATED @@ -942,6 +944,9 @@ class Job: if obj.get('success_callback_name'): self._success_callback_name = obj.get('success_callback_name').decode() + if obj.get('success_callback_params'): + self._success_callback_params = json.loads(obj.get('success_callback_params').decode()) + if 'success_callback_timeout' in obj: self._success_callback_timeout = int(obj.get('success_callback_timeout')) @@ -1018,6 +1023,8 @@ class Job: 'worker_name': self.worker_name or '', } + if self._success_callback_params is not None: + obj['success_callback_params'] = json.dumps(self._success_callback_params) if self.retries_left is not None: obj['retries_left'] = self.retries_left if self.retry_intervals is not None: @@ -1418,7 +1425,7 @@ class Job: logger.debug('Running success callbacks for %s', self.id) with death_penalty_class(self.success_callback_timeout, JobTimeoutException, job_id=self.id): - self.success_callback(self, self.connection, result) + self.success_callback(self, self.connection, result, self._success_callback_params) def execute_failure_callback(self, death_penalty_class: Type[BaseDeathPenalty], *exc_info): """Executes failure_callback with possible timeout""" @@ -1645,12 +1652,13 @@ class Retry: class Callback: - def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[Any] = None): + def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[Any] = None, params: Optional[Dict[str, Any]] = None): if not isinstance(func, str) and not inspect.isfunction(func) and not inspect.isbuiltin(func): raise ValueError('Callback `func` must be a string or function') self.func = func self.timeout = parse_timeout(timeout) if timeout else CALLBACK_TIMEOUT + self.params = params @property def name(self) -> str: diff --git a/rq/worker.py b/rq/worker.py index e6751352..55b27c8e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1472,6 +1472,7 @@ class Worker(BaseWorker): timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): self.log.debug('Performing Job...') + self.log.debug(f"custom job params: {job._success_callback_params}") rv = job.perform() self.log.debug('Finished performing Job ID %s', job.id)