mirror of https://github.com/rq/rq.git
Exit gracefully when user hits Ctrl+C in a worker.
The currently running task will be waited for, so it can gracefully be finished. Further execution will be stopped. If, during this waiting phase, Ctrl+C is hit again, the worker and the horse will be terminated forcefully (this means work could be lost or partially finished).
This commit is contained in:
parent
ba965a1dd9
commit
e278bd2967
41
rq/worker.py
41
rq/worker.py
|
@ -1,9 +1,11 @@
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
import errno
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
import procname
|
import procname
|
||||||
import socket
|
import socket
|
||||||
|
import signal
|
||||||
from pickle import dumps
|
from pickle import dumps
|
||||||
try:
|
try:
|
||||||
from logbook import Logger
|
from logbook import Logger
|
||||||
|
@ -63,6 +65,7 @@ class Worker(object):
|
||||||
self.rv_ttl = rv_ttl
|
self.rv_ttl = rv_ttl
|
||||||
self._state = 'starting'
|
self._state = 'starting'
|
||||||
self._is_horse = False
|
self._is_horse = False
|
||||||
|
self._stopped = False
|
||||||
self.log = Logger('worker')
|
self.log = Logger('worker')
|
||||||
|
|
||||||
|
|
||||||
|
@ -154,14 +157,46 @@ class Worker(object):
|
||||||
|
|
||||||
state = property(get_state, set_state)
|
state = property(get_state, set_state)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stopped(self):
|
||||||
|
return self._stopped
|
||||||
|
|
||||||
|
def install_sigint_handler(self):
|
||||||
|
def request_force_stop(signum, frame):
|
||||||
|
"""Terminates the application."""
|
||||||
|
self.log.warning('Cold shut down.')
|
||||||
|
raise SystemExit()
|
||||||
|
|
||||||
|
def request_stop(signum, frame):
|
||||||
|
signal.signal(signal.SIGINT, request_force_stop)
|
||||||
|
|
||||||
|
if self.is_horse:
|
||||||
|
self.log.debug('Ignoring SIGINT.')
|
||||||
|
return
|
||||||
|
|
||||||
|
self.log.warning('Warm shut down. Press Ctrl+C again for a cold shutdown.')
|
||||||
|
|
||||||
|
#if self.state == 'idle':
|
||||||
|
# raise SystemExit()
|
||||||
|
self._stopped = True
|
||||||
|
self.log.debug('Stopping after current horse is finished.')
|
||||||
|
|
||||||
|
signal.signal(signal.SIGINT, request_stop)
|
||||||
|
|
||||||
|
|
||||||
def _work(self, quit_when_done=False):
|
def _work(self, quit_when_done=False):
|
||||||
"""This method starts the work loop.
|
"""This method starts the work loop.
|
||||||
"""
|
"""
|
||||||
|
self.install_sigint_handler()
|
||||||
|
|
||||||
did_work = False
|
did_work = False
|
||||||
self.register_birth()
|
self.register_birth()
|
||||||
self.state = 'starting'
|
self.state = 'starting'
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
|
if self.stopped:
|
||||||
|
self.log.info('Stopping on request.')
|
||||||
|
break
|
||||||
self.state = 'idle'
|
self.state = 'idle'
|
||||||
qnames = self.queue_names()
|
qnames = self.queue_names()
|
||||||
self.procline('Listening on %s' % (','.join(qnames)))
|
self.procline('Listening on %s' % (','.join(qnames)))
|
||||||
|
@ -171,10 +206,12 @@ class Worker(object):
|
||||||
if job is None:
|
if job is None:
|
||||||
break
|
break
|
||||||
self.state = 'busy'
|
self.state = 'busy'
|
||||||
|
|
||||||
self.fork_and_perform_job(job)
|
self.fork_and_perform_job(job)
|
||||||
|
|
||||||
did_work = True
|
did_work = True
|
||||||
finally:
|
finally:
|
||||||
if not self._is_horse:
|
if not self.is_horse:
|
||||||
self.register_death()
|
self.register_death()
|
||||||
return did_work
|
return did_work
|
||||||
|
|
||||||
|
@ -201,7 +238,7 @@ class Worker(object):
|
||||||
self.log = Logger('horse')
|
self.log = Logger('horse')
|
||||||
try:
|
try:
|
||||||
self.perform_job(job)
|
self.perform_job(job)
|
||||||
except Exception, e:
|
except Exception as e:
|
||||||
self.log.exception(e)
|
self.log.exception(e)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
Loading…
Reference in New Issue