mirror of https://github.com/rq/rq.git
add: custom parameters
This commit is contained in:
parent
a044248430
commit
56a71ac351
12
rq/job.py
12
rq/job.py
|
@ -254,6 +254,7 @@ class Job:
|
|||
on_success = Callback(on_success) # backward compatibility
|
||||
job._success_callback_name = on_success.name
|
||||
job._success_callback_timeout = on_success.timeout
|
||||
job._success_callback_params = on_success.params
|
||||
|
||||
if on_failure:
|
||||
if not isinstance(on_failure, Callback):
|
||||
|
@ -637,6 +638,7 @@ class Job:
|
|||
self._args = UNEVALUATED
|
||||
self._kwargs = UNEVALUATED
|
||||
self._success_callback_name = None
|
||||
self._success_callback_params = None
|
||||
self._success_callback = UNEVALUATED
|
||||
self._failure_callback_name = None
|
||||
self._failure_callback = UNEVALUATED
|
||||
|
@ -942,6 +944,9 @@ class Job:
|
|||
if obj.get('success_callback_name'):
|
||||
self._success_callback_name = obj.get('success_callback_name').decode()
|
||||
|
||||
if obj.get('success_callback_params'):
|
||||
self._success_callback_params = json.loads(obj.get('success_callback_params').decode())
|
||||
|
||||
if 'success_callback_timeout' in obj:
|
||||
self._success_callback_timeout = int(obj.get('success_callback_timeout'))
|
||||
|
||||
|
@ -1018,6 +1023,8 @@ class Job:
|
|||
'worker_name': self.worker_name or '',
|
||||
}
|
||||
|
||||
if self._success_callback_params is not None:
|
||||
obj['success_callback_params'] = json.dumps(self._success_callback_params)
|
||||
if self.retries_left is not None:
|
||||
obj['retries_left'] = self.retries_left
|
||||
if self.retry_intervals is not None:
|
||||
|
@ -1418,7 +1425,7 @@ class Job:
|
|||
|
||||
logger.debug('Running success callbacks for %s', self.id)
|
||||
with death_penalty_class(self.success_callback_timeout, JobTimeoutException, job_id=self.id):
|
||||
self.success_callback(self, self.connection, result)
|
||||
self.success_callback(self, self.connection, result, self._success_callback_params)
|
||||
|
||||
def execute_failure_callback(self, death_penalty_class: Type[BaseDeathPenalty], *exc_info):
|
||||
"""Executes failure_callback with possible timeout"""
|
||||
|
@ -1645,12 +1652,13 @@ class Retry:
|
|||
|
||||
|
||||
class Callback:
|
||||
def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[Any] = None):
|
||||
def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[Any] = None, params: Optional[Dict[str, Any]] = None):
|
||||
if not isinstance(func, str) and not inspect.isfunction(func) and not inspect.isbuiltin(func):
|
||||
raise ValueError('Callback `func` must be a string or function')
|
||||
|
||||
self.func = func
|
||||
self.timeout = parse_timeout(timeout) if timeout else CALLBACK_TIMEOUT
|
||||
self.params = params
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
|
|
@ -1472,6 +1472,7 @@ class Worker(BaseWorker):
|
|||
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
|
||||
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
|
||||
self.log.debug('Performing Job...')
|
||||
self.log.debug(f"custom job params: {job._success_callback_params}")
|
||||
rv = job.perform()
|
||||
self.log.debug('Finished performing Job ID %s', job.id)
|
||||
|
||||
|
|
Loading…
Reference in New Issue