From 546e999dbcffa5c68ef3642c36bcece8021d4ebf Mon Sep 17 00:00:00 2001 From: ed Date: Mon, 8 Jan 2018 02:03:33 +0100 Subject: [PATCH] concurrency fixes * clean shutdown * fix crash on client disconnect * workaround race in print builtin * deadlock diagnostics helper --- .gitignore | 3 +++ r0c.py | 61 ++++++++++++++++++++--------------------- r0c/c_netcat.py | 7 ++--- r0c/c_telnet.py | 14 ++++++---- r0c/c_vt100.py | 30 ++++++++++++++------- r0c/chat.py | 9 ++++--- r0c/config.py | 3 --- r0c/unrag.py | 1 + r0c/util.py | 72 +++++++++++++++++++++++++++++++++++++++++-------- test/mthread.py | 29 ++++++++++++++++++++ 10 files changed, 164 insertions(+), 65 deletions(-) create mode 100644 test/mthread.py diff --git a/.gitignore b/.gitignore index 6945799..04068b6 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ __pycache__/ # sublime *.sublime-workspace + +# diagnostics +*.stack diff --git a/r0c.py b/r0c.py index 1502d12..1425dae 100644 --- a/r0c.py +++ b/r0c.py @@ -1,6 +1,6 @@ #!/usr/bin/env python2 # -*- coding: utf-8 -*- - +from __future__ import print_function """r0c.py: retr0chat Telnet/Netcat Server""" @@ -11,10 +11,8 @@ __license__ = "MIT" __copyright__ = 2018 - import sys import signal - if sys.version_info[0] == 2: sys.dont_write_bytecode = True @@ -26,13 +24,11 @@ from r0c.c_telnet import * from r0c.chat import * - if __name__ != '__main__': print('this is not a library') sys.exit(1) - class Core(object): def __init__(self): if len(sys.argv) != 3: @@ -51,40 +47,55 @@ class Core(object): print(' * Telnet server on port ' + str(self.telnet_port)) print(' * NetCat server on port ' + str(self.netcat_port)) - self.stopped = False self.stopping = False self.pushthr_alive = False self.asyncore_alive = False - self.p = Printer() - - self.p.p(' * Capturing ^C') + print(' * Capturing ^C') signal.signal(signal.SIGINT, self.signal_handler) - self.p.p(' * Creating world') + print(' * Creating world') self.world = World(self) - self.p.p(' * Starting Telnet server') - self.telnet_server = TelnetServer(self.p, '0.0.0.0', self.telnet_port, self.world) + print(' * Starting Telnet server') + self.telnet_server = TelnetServer('0.0.0.0', self.telnet_port, self.world) - self.p.p(' * Starting NetCat server') - self.netcat_server = NetcatServer(self.p, '0.0.0.0', self.netcat_port, self.world) + print(' * Starting NetCat server') + self.netcat_server = NetcatServer('0.0.0.0', self.netcat_port, self.world) - self.p.p(' * Starting push driver') + print(' * Starting push driver') self.push_thr = threading.Thread(target=self.push_worker, args=([self.telnet_server, self.netcat_server],)) #self.push_thr.daemon = True self.push_thr.start() - self.p.p(' * Handover to asyncore') + print(' * Handover to asyncore') self.asyncore_thr = threading.Thread(target=self.asyncore_worker) self.asyncore_thr.start() def run(self): - core.p.p(' * r0c is up') - while not self.stopped: + print(' * r0c is up') + + while not self.stopping: time.sleep(0.1) - core.p.p(' * bye') + + print(' * asyncore terminating') + clean_shutdown = False + for n in range(0, 40): # 2sec + if not self.asyncore_alive: + print(' * asyncore stopped cleanly') + clean_shutdown = True + break + time.sleep(0.05) + + if not clean_shutdown: + print(' -X- asyncore is stuck') + + print(' * asyncore cleanup') + self.netcat_server.close() + self.telnet_server.close() + + print(' * r0c is down') def asyncore_worker(self): @@ -125,19 +136,9 @@ class Core(object): def shutdown(self): + #monitor_threads() self.stopping = True - self.p.p(' * Stopping asyncore') - while self.asyncore_alive: - time.sleep(0.05) - - self.p.p(' * Terminating asyncore') - self.netcat_server.close() - self.telnet_server.close() - - self.p.p(' * r0c is down') - self.stopped = True - def signal_handler(self, signal, frame): self.shutdown() diff --git a/r0c/c_netcat.py b/r0c/c_netcat.py index 1c18acb..2bf3009 100644 --- a/r0c/c_netcat.py +++ b/r0c/c_netcat.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from __future__ import print_function if __name__ == '__main__': raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72, ' this file is part of retr0chat', @@ -16,8 +17,8 @@ PY2 = (sys.version_info[0] == 2) class NetcatServer(VT100_Server): - def __init__(self, p, host, port, world): - VT100_Server.__init__(self, p, host, port, world) + def __init__(self, host, port, world): + VT100_Server.__init__(self, host, port, world) def gen_remote(self, socket, addr, user): return NetcatClient(self, socket, addr, self.world, user) @@ -36,7 +37,7 @@ class NetcatClient(VT100_Client): print('XXX reading when dead') return - data = self.recv(MSG_LEN) + data = self.recv(8192) if not data and not self.dead: self.host.part(self) return diff --git a/r0c/c_telnet.py b/r0c/c_telnet.py index 6973026..0428262 100644 --- a/r0c/c_telnet.py +++ b/r0c/c_telnet.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from __future__ import print_function if __name__ == '__main__': raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72, ' this file is part of retr0chat', @@ -143,8 +144,8 @@ if not PY2: class TelnetServer(VT100_Server): - def __init__(self, p, host, port, world): - VT100_Server.__init__(self, p, host, port, world) + def __init__(self, host, port, world): + VT100_Server.__init__(self, host, port, world) def gen_remote(self, socket, addr, user): return TelnetClient(self, socket, addr, self.world, user) @@ -168,9 +169,12 @@ class TelnetClient(VT100_Client): print('XXX reading when dead') return - data = self.recv(MSG_LEN) - if not data and not self.dead: - self.host.part(self) + data = self.recv(8192) + if not data: + if not self.dead: + # seems like handle_close or handle_error gets + # called willy-nilly when somebody disconnects + self.host.part(self) return if HEXDUMP_IN: diff --git a/r0c/c_vt100.py b/r0c/c_vt100.py index 6cdad30..2aae119 100644 --- a/r0c/c_vt100.py +++ b/r0c/c_vt100.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from __future__ import print_function if __name__ == '__main__': raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72, ' this file is part of retr0chat', @@ -26,9 +27,8 @@ else: class VT100_Server(asyncore.dispatcher): - def __init__(self, p, host, port, world): + def __init__(self, host, port, world): asyncore.dispatcher.__init__(self) - self.p = p self.world = world self.clients = [] self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -41,7 +41,7 @@ class VT100_Server(asyncore.dispatcher): self.listen(1) def con(self, msg, adr, add=0): - self.p.p(' {0} {1} {2} {3} :{4}'.format( + print(' {0} {1} {2} {3} :{4}'.format( msg, fmt(), len(self.clients)+add, adr[0], adr[1])) def gen_remote(self, socket, addr, user): @@ -65,9 +65,9 @@ class VT100_Server(asyncore.dispatcher): def part(self, remote): remote.dead = True with self.world.mutex: - print('='*72) - traceback.print_stack() - print('='*72) + #print('==[part]' + '='*72) + #traceback.print_stack() + #print('==[part]' + '='*71) remote.close() self.con(' -', remote.addr, -1) @@ -152,6 +152,7 @@ class VT100_Client(asyncore.dispatcher): self.add_esc(u'\x1b\x5b\x35\x7e', 'pgup') self.add_esc(u'\x1b\x5b\x36\x7e', 'pgdn') self.add_esc(u'\x08', 'bs') + self.add_esc(u'\x09', 'tab') self.add_esc(u'\x0d\x0a', 'ret') self.add_esc(u'\x0d\x00', 'ret') @@ -196,6 +197,9 @@ class VT100_Client(asyncore.dispatcher): def say(self, message): self.outbox.put(message) + def readable(self): + return not self.dead + def writable(self): #if self.slowmo_tx: # #print('x') @@ -211,7 +215,13 @@ class VT100_Client(asyncore.dispatcher): ) def handle_close(self): - self.host.part(self) + if not self.dead: + self.host.part(self) + + def handle_error(self): + whoops() + if not self.dead: + self.host.part(self) @@ -484,7 +494,7 @@ class VT100_Client(asyncore.dispatcher): ch = self.user.active_chan nch = ch.nchan - debug_scrolling = True + debug_scrolling = False #if not self.vt100: # if self.scroll_cmd is not None: @@ -949,7 +959,7 @@ class VT100_Client(asyncore.dispatcher): def conf_wizard(self): if DBG: if u'\x03' in self.in_text: - sys.exit(0) + self.world.core.shutdown() sep = u'{0}{1}{0}\033[2A'.format(u'\n', u'/'*71) ftop = u'\n'*20 + u'\033[H\033[J' @@ -1242,7 +1252,7 @@ class VT100_Client(asyncore.dispatcher): for ch in self.in_text: if DBG: if ch == '\x03': - sys.exit(0) + self.world.core.shutdown() was_esc = None if aside and aside in self.esc_tab: diff --git a/r0c/chat.py b/r0c/chat.py index 2f82dd2..d32e861 100644 --- a/r0c/chat.py +++ b/r0c/chat.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from __future__ import print_function if __name__ == '__main__': raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72, ' this file is part of retr0chat', @@ -128,9 +129,9 @@ Text formatting: \033[36mCTRL-O\033[0m reset text formatting \033[36mCTRL-B\033[0m bold/bright text on/off \033[36mCTRL-K\033[0m followed by a colour code: - \033[36m2\033[0m \033[32mgreen\033[0m, - \033[36m3,1\033[0m \033[33;41myellow on red\033[0m -- - say \033[1m/cmap\033[0m to see all options + \033[36m2\033[0m \033[32mgreen\033[0m, + \033[36m3,1\033[0m \033[33;41myellow on red\033[0m -- + say \033[1m/cmap\033[0m to see all options Switching channels: \033[36mCTRL-A\033[0m jump to previous channel @@ -208,6 +209,8 @@ if you are using a mac, PgUp is fn-Shift-PgUp txt = u' message {0}\n mes {0}'.format(n) self.world.send_chan_msg(self.nick, nchan, txt) + #self.client.refresh(False) + def exec_cmd(self, cmd_str): diff --git a/r0c/config.py b/r0c/config.py index 7f0887c..76b1eef 100644 --- a/r0c/config.py +++ b/r0c/config.py @@ -22,7 +22,4 @@ SLOW_MOTION_TX = False FORCE_LINEMODE = True FORCE_LINEMODE = False - - -MSG_LEN = 8192 HEX_WIDTH = 16 diff --git a/r0c/unrag.py b/r0c/unrag.py index a8a7262..9c60846 100644 --- a/r0c/unrag.py +++ b/r0c/unrag.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from __future__ import print_function if __name__ == '__main__': raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72, ' this file is part of retr0chat', diff --git a/r0c/util.py b/r0c/util.py index 3c1d19d..41e45cc 100644 --- a/r0c/util.py +++ b/r0c/util.py @@ -1,9 +1,11 @@ # -*- coding: utf-8 -*- +from __future__ import print_function if __name__ == '__main__': raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72, ' this file is part of retr0chat', ' run r0c.py instead')) +import traceback import threading import time import sys @@ -12,6 +14,20 @@ from .config import * PY2 = (sys.version_info[0] == 2) +print_mutex = threading.Lock() +if PY2: + import __builtin__ + def print(*args, **kwargs): + with print_mutex: + #__builtin__.print("y") + __builtin__.print(*args, **kwargs) +else: + import builtins + def print(*args, **kwargs): + with print_mutex: + #builtins.print("y") + builtins.print(*args, **kwargs) + def fmt(): return time.strftime('%d/%m/%Y, %H:%M:%S') @@ -253,14 +269,48 @@ def visualize_all_unicode_codepoints_as_utf8(): -class Printer(object): - - def __init__(self): - self.mutex = threading.Lock() - - def p(self, data): - with self.mutex: - #if len(data) < 13: - # data += ' ' * 13 - sys.stdout.write('{0}\n'.format(data)) - #sys.stdout.flush() +def whoops(): + msg = """\ + __ + _ __/ /_ ____ ____ ____ _____ + | | /| / / __ \/ __ \/ __ \/ __ \/ ___/ + | |/ |/ / / / / /_/ / /_/ / /_/ (__ ) + |__/|__/_/ /_/\____/\____/ .___/____/ + /_/""" + exc = traceback.format_exc() + if exc.startswith('None'): + exc = ''.join(traceback.format_stack()[:-1]) + msg = '{0}\n{1}\n{2}'.format( + msg, exc.rstrip(), '-'*64) + print(msg) + + + +thread_monitor_enabled = False +def monitor_threads(): + global thread_monitor_enabled + if thread_monitor_enabled: + return + thread_monitor_enabled = True + + def t_a_a_bt(): + ret = [] + for tid, stack in sys._current_frames().items(): + ret.append(u'\nThread {0} {1}'.format(tid, '='*64)) + for fn, lno, func, line in traceback.extract_stack(stack): + ret.append(u' File "{0}", line {1}, in {2}'.format(fn, lno, func)) + if line: + ret.append(u' {0}'.format(line.strip())) + return u'\n'.join(ret) + + def stack_collector(): + while True: + print('capturing stack') + time.sleep(5) + txt = t_a_a_bt() + with open('r0c.stack', 'wb') as f: + f.write(txt.encode('utf-8')) + + thr = threading.Thread(target=stack_collector) + thr.daemon = True + thr.start() diff --git a/test/mthread.py b/test/mthread.py new file mode 100644 index 0000000..42b01f2 --- /dev/null +++ b/test/mthread.py @@ -0,0 +1,29 @@ +# test case for multithreaded access to the print() builtin, +# the native print appears to have a mutex covering the main message +# but the 'end' variable is tacked on outside of it +# meaning you might lose the newline to a pending message + +import threading +import time + +nthr = 0 +msg = '\n'.join('{0}a'.format(' '*(x*2)) for x in range(30)) + +p_mutex = threading.Lock() +def p(*args, **kwargs): + with p_mutex: + print(*args, **kwargs) + +def worker(): + global nthr + nthr += 1 + while True: + p(nthr) + p(msg, end='1234567890\n') + +for n in range(4): + thr = threading.Thread(target=worker) + thr.daemon = True + thr.start() + +time.sleep(1)