Add beginnings of a rqworker script.

This commit is contained in:
Vincent Driessen 2011-11-18 00:51:34 +01:00
parent d780c929c0
commit a029e5437b
2 changed files with 45 additions and 7 deletions

34
bin/rqworker Executable file
View File

@ -0,0 +1,34 @@
#!/usr/bin/env python
import optparse
from rq import use_redis, Queue, Worker
def parse_args():
parser = optparse.OptionParser()
parser.add_option('-b', '--burst', dest='burst',
action='store_true', default=False,
help='Run in burst mode (quit after all work is done).')
parser.add_option('-n', '--name', dest='name',
action='store', type='string', default=None,
help='Specify a different name.')
opts, args = parser.parse_args()
return (opts, args, parser)
def main():
opts, args, parser = parse_args()
use_redis()
if len(args) == 0:
# Use the default queue
queues = [Queue()]
else:
queues = map(Queue, args)
w = Worker(queues, name=opts.name)
if opts.burst:
w.work_burst()
else:
w.work()
if __name__ == '__main__':
main()

View File

@ -3,6 +3,7 @@ import os
import random import random
import time import time
import procname import procname
import socket
from pickle import dumps from pickle import dumps
try: try:
from logbook import Logger from logbook import Logger
@ -17,15 +18,17 @@ def iterable(x):
return hasattr(x, '__iter__') return hasattr(x, '__iter__')
class Worker(object): class Worker(object):
def __init__(self, queues, rv_ttl=500): def __init__(self, queues, name=None, rv_ttl=500):
if isinstance(queues, Queue): if isinstance(queues, Queue):
queues = [queues] queues = [queues]
self._name = name
self.queues = queues self.queues = queues
self.validate_queues() self.validate_queues()
self.rv_ttl = rv_ttl self.rv_ttl = rv_ttl
self._working = False self._working = False
self.log = Logger('worker') self.log = Logger('worker')
def validate_queues(self): def validate_queues(self):
if not iterable(self.queues): if not iterable(self.queues):
raise ValueError('Argument queues not iterable.') raise ValueError('Argument queues not iterable.')
@ -40,12 +43,13 @@ class Worker(object):
return map(lambda q: q.key, self.queues) return map(lambda q: q.key, self.queues)
def is_idle(self): @property
return not self.is_working() def name(self):
if self._name is None:
def is_working(self): hostname = socket.gethostname()
return self._working shortname, _, _ = hostname.partition('.')
self._name = '%s.%s' % (shortname, self.pid)
return self._name
@property @property
def pid(self): def pid(self):