diff --git a/bin/rqinfo b/bin/rqinfo index 06e8192e..ea3eebf8 100755 --- a/bin/rqinfo +++ b/bin/rqinfo @@ -38,80 +38,94 @@ def state_symbol(state): def show_queues(args): - while True: - if len(args.queues): - qs = map(Queue, args.queues) - else: - qs = Queue.all() + if len(args.queues): + qs = map(Queue, args.queues) + else: + qs = Queue.all() - num_jobs = 0 - termwidth, _ = gettermsize() - chartwidth = min(20, termwidth - 20) + num_jobs = 0 + termwidth, _ = gettermsize() + chartwidth = min(20, termwidth - 20) - max_count = 0 - counts = dict() - for q in qs: - count = q.count - counts[q] = count - max_count = max(max_count, count) - scale = get_scale(max_count) - ratio = chartwidth * 1.0 / scale + max_count = 0 + counts = dict() + for q in qs: + count = q.count + counts[q] = count + max_count = max(max_count, count) + scale = get_scale(max_count) + ratio = chartwidth * 1.0 / scale - if args.interval: - os.system('clear') - - for q in qs: - count = counts[q] - if not args.raw: - chart = green('|' + '█' * int(ratio * count)) - line = '%-12s %s %d' % (q.name, chart, count) - else: - line = '%-12s %d' % (q.name, count) - print(line) - - num_jobs += count - - # Print summary when not in raw mode + for q in qs: + count = counts[q] if not args.raw: - print('%d queues, %d jobs total' % (len(qs), num_jobs)) - - if args.interval: - time.sleep(args.interval) + chart = green('|' + '█' * int(ratio * count)) + line = '%-12s %s %d' % (q.name, chart, count) else: - break + line = 'queue %s %d' % (q.name, count) + print(line) + + num_jobs += count + + # Print summary when not in raw mode + if not args.raw: + print('%d queues, %d jobs total' % (len(qs), num_jobs)) + def show_workers(args): - while True: + if len(args.queues): + qs = map(Queue, args.queues) + + def any_matching_queue(worker): + def queue_matches(q): + return q in qs + return any(map(queue_matches, worker.queues)) + + # Filter out workers that don't match the queue filter + ws = [w for w in Worker.all() if any_matching_queue(w)] + else: qs = Queue.all() ws = Worker.all() - if args.interval: - os.system('clear') + if not args.by_queue: + def filter_queues(queue_names): + return [qname for qname in queue_names if Queue(qname) in qs] - queues = {qname: [] for qname in qs} + for w in ws: + if not args.raw: + print '%s %s: %s' % (w.name, state_symbol(w.state), ', '.join(filter_queues(w.queue_names()))) + else: + print 'worker %s %s %s' % (w.name, w.state, ','.join(filter_queues(w.queue_names()))) + else: + # Create reverse lookup table + queues = {q: [] for q in qs} for w in ws: for q in w.queues: if not q in queues: - queues[q] = [] + continue queues[q].append(w) - if args.by_queue: - max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 - for q in queues: - if queues[q]: - queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.state)), queues[q]))) - else: - queues_str = '–' - print '%s %s' % (pad(q.name + ':', max_qname + 1), queues_str) - else: - for w in ws: - print '%s %s: %s' % (w.name, state_symbol(w.state), ', '.join(w.queue_names())) - print '%d workers, %d queues' % (len(ws), len(queues)) + max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 + for q in queues: + if queues[q]: + queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.state)), queues[q]))) + else: + queues_str = '–' + print '%s %s' % (pad(q.name + ':', max_qname + 1), queues_str) - if args.interval: - time.sleep(args.interval) - else: - break + if not args.raw: + print '%d workers, %d queues' % (len(ws), len(qs)) + + +def show_both(args): + show_queues(args) + if not args.raw: + print '' + show_workers(args) + if not args.raw: + print '' + import datetime + print 'Updated: %s' % datetime.datetime.now() def parse_args(): @@ -120,24 +134,26 @@ def parse_args(): parser.add_argument('--port', '-p', type=int, default=6379, help='The Redis portnumber (default: 6379)') parser.add_argument('--db', '-d', type=int, default=0, help='The Redis database (default: 0)') parser.add_argument('--path', '-P', default='.', help='Specify the import path.') - - parent_parser = argparse.ArgumentParser(add_help=False) - parent_parser.add_argument('--interval', '-i', metavar='N', type=float, default=0, help='Updates stats every N seconds (default: don\'t poll)') - - subparsers = parser.add_subparsers() - - queues_p = subparsers.add_parser('queues', parents=[parent_parser], help='Show queue info') - queues_p.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') - queues_p.add_argument('queues', nargs='*', help='The queues to poll') - queues_p.set_defaults(func=show_queues) - - workers_p = subparsers.add_parser('workers', parents=[parent_parser], help='Show worker activity') - workers_p.add_argument('--by-queue', '-Q', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') - workers_p.set_defaults(func=show_workers) - + parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') + parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') + parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') + parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') + parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') + parser.add_argument('queues', nargs='*', help='The queues to poll') return parser.parse_args() +def interval(val, func, args): + while True: + if val: + os.system('clear') + func(args) + if val: + time.sleep(val) + else: + break + + def main(): args = parse_args() @@ -148,7 +164,14 @@ def main(): redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db) use_connection(redis_conn) try: - args.func(args) + if args.only_queues: + func = show_queues + elif args.only_workers: + func = show_workers + else: + func = show_both + + interval(args.interval, func, args) except ConnectionError as e: print(e)