concurrency fixes

* clean shutdown
* fix crash on client disconnect
* workaround race in print builtin
* deadlock diagnostics helper
This commit is contained in:
ed 2018-01-08 02:03:33 +01:00
parent db99e50cb0
commit 546e999dbc
10 changed files with 164 additions and 65 deletions

3
.gitignore vendored
View File

@ -5,3 +5,6 @@ __pycache__/
# sublime
*.sublime-workspace
# diagnostics
*.stack

61
r0c.py
View File

@ -1,6 +1,6 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import print_function
"""r0c.py: retr0chat Telnet/Netcat Server"""
@ -11,10 +11,8 @@ __license__ = "MIT"
__copyright__ = 2018
import sys
import signal
if sys.version_info[0] == 2:
sys.dont_write_bytecode = True
@ -26,13 +24,11 @@ from r0c.c_telnet import *
from r0c.chat import *
if __name__ != '__main__':
print('this is not a library')
sys.exit(1)
class Core(object):
def __init__(self):
if len(sys.argv) != 3:
@ -51,40 +47,55 @@ class Core(object):
print(' * Telnet server on port ' + str(self.telnet_port))
print(' * NetCat server on port ' + str(self.netcat_port))
self.stopped = False
self.stopping = False
self.pushthr_alive = False
self.asyncore_alive = False
self.p = Printer()
self.p.p(' * Capturing ^C')
print(' * Capturing ^C')
signal.signal(signal.SIGINT, self.signal_handler)
self.p.p(' * Creating world')
print(' * Creating world')
self.world = World(self)
self.p.p(' * Starting Telnet server')
self.telnet_server = TelnetServer(self.p, '0.0.0.0', self.telnet_port, self.world)
print(' * Starting Telnet server')
self.telnet_server = TelnetServer('0.0.0.0', self.telnet_port, self.world)
self.p.p(' * Starting NetCat server')
self.netcat_server = NetcatServer(self.p, '0.0.0.0', self.netcat_port, self.world)
print(' * Starting NetCat server')
self.netcat_server = NetcatServer('0.0.0.0', self.netcat_port, self.world)
self.p.p(' * Starting push driver')
print(' * Starting push driver')
self.push_thr = threading.Thread(target=self.push_worker, args=([self.telnet_server, self.netcat_server],))
#self.push_thr.daemon = True
self.push_thr.start()
self.p.p(' * Handover to asyncore')
print(' * Handover to asyncore')
self.asyncore_thr = threading.Thread(target=self.asyncore_worker)
self.asyncore_thr.start()
def run(self):
core.p.p(' * r0c is up')
while not self.stopped:
print(' * r0c is up')
while not self.stopping:
time.sleep(0.1)
core.p.p(' * bye')
print(' * asyncore terminating')
clean_shutdown = False
for n in range(0, 40): # 2sec
if not self.asyncore_alive:
print(' * asyncore stopped cleanly')
clean_shutdown = True
break
time.sleep(0.05)
if not clean_shutdown:
print(' -X- asyncore is stuck')
print(' * asyncore cleanup')
self.netcat_server.close()
self.telnet_server.close()
print(' * r0c is down')
def asyncore_worker(self):
@ -125,19 +136,9 @@ class Core(object):
def shutdown(self):
#monitor_threads()
self.stopping = True
self.p.p(' * Stopping asyncore')
while self.asyncore_alive:
time.sleep(0.05)
self.p.p(' * Terminating asyncore')
self.netcat_server.close()
self.telnet_server.close()
self.p.p(' * r0c is down')
self.stopped = True
def signal_handler(self, signal, frame):
self.shutdown()

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
if __name__ == '__main__':
raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72,
' this file is part of retr0chat',
@ -16,8 +17,8 @@ PY2 = (sys.version_info[0] == 2)
class NetcatServer(VT100_Server):
def __init__(self, p, host, port, world):
VT100_Server.__init__(self, p, host, port, world)
def __init__(self, host, port, world):
VT100_Server.__init__(self, host, port, world)
def gen_remote(self, socket, addr, user):
return NetcatClient(self, socket, addr, self.world, user)
@ -36,7 +37,7 @@ class NetcatClient(VT100_Client):
print('XXX reading when dead')
return
data = self.recv(MSG_LEN)
data = self.recv(8192)
if not data and not self.dead:
self.host.part(self)
return

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
if __name__ == '__main__':
raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72,
' this file is part of retr0chat',
@ -143,8 +144,8 @@ if not PY2:
class TelnetServer(VT100_Server):
def __init__(self, p, host, port, world):
VT100_Server.__init__(self, p, host, port, world)
def __init__(self, host, port, world):
VT100_Server.__init__(self, host, port, world)
def gen_remote(self, socket, addr, user):
return TelnetClient(self, socket, addr, self.world, user)
@ -168,9 +169,12 @@ class TelnetClient(VT100_Client):
print('XXX reading when dead')
return
data = self.recv(MSG_LEN)
if not data and not self.dead:
self.host.part(self)
data = self.recv(8192)
if not data:
if not self.dead:
# seems like handle_close or handle_error gets
# called willy-nilly when somebody disconnects
self.host.part(self)
return
if HEXDUMP_IN:

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
if __name__ == '__main__':
raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72,
' this file is part of retr0chat',
@ -26,9 +27,8 @@ else:
class VT100_Server(asyncore.dispatcher):
def __init__(self, p, host, port, world):
def __init__(self, host, port, world):
asyncore.dispatcher.__init__(self)
self.p = p
self.world = world
self.clients = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
@ -41,7 +41,7 @@ class VT100_Server(asyncore.dispatcher):
self.listen(1)
def con(self, msg, adr, add=0):
self.p.p(' {0} {1} {2} {3} :{4}'.format(
print(' {0} {1} {2} {3} :{4}'.format(
msg, fmt(), len(self.clients)+add, adr[0], adr[1]))
def gen_remote(self, socket, addr, user):
@ -65,9 +65,9 @@ class VT100_Server(asyncore.dispatcher):
def part(self, remote):
remote.dead = True
with self.world.mutex:
print('='*72)
traceback.print_stack()
print('='*72)
#print('==[part]' + '='*72)
#traceback.print_stack()
#print('==[part]' + '='*71)
remote.close()
self.con(' -', remote.addr, -1)
@ -152,6 +152,7 @@ class VT100_Client(asyncore.dispatcher):
self.add_esc(u'\x1b\x5b\x35\x7e', 'pgup')
self.add_esc(u'\x1b\x5b\x36\x7e', 'pgdn')
self.add_esc(u'\x08', 'bs')
self.add_esc(u'\x09', 'tab')
self.add_esc(u'\x0d\x0a', 'ret')
self.add_esc(u'\x0d\x00', 'ret')
@ -196,6 +197,9 @@ class VT100_Client(asyncore.dispatcher):
def say(self, message):
self.outbox.put(message)
def readable(self):
return not self.dead
def writable(self):
#if self.slowmo_tx:
# #print('x')
@ -211,7 +215,13 @@ class VT100_Client(asyncore.dispatcher):
)
def handle_close(self):
self.host.part(self)
if not self.dead:
self.host.part(self)
def handle_error(self):
whoops()
if not self.dead:
self.host.part(self)
@ -484,7 +494,7 @@ class VT100_Client(asyncore.dispatcher):
ch = self.user.active_chan
nch = ch.nchan
debug_scrolling = True
debug_scrolling = False
#if not self.vt100:
# if self.scroll_cmd is not None:
@ -949,7 +959,7 @@ class VT100_Client(asyncore.dispatcher):
def conf_wizard(self):
if DBG:
if u'\x03' in self.in_text:
sys.exit(0)
self.world.core.shutdown()
sep = u'{0}{1}{0}\033[2A'.format(u'\n', u'/'*71)
ftop = u'\n'*20 + u'\033[H\033[J'
@ -1242,7 +1252,7 @@ class VT100_Client(asyncore.dispatcher):
for ch in self.in_text:
if DBG:
if ch == '\x03':
sys.exit(0)
self.world.core.shutdown()
was_esc = None
if aside and aside in self.esc_tab:

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
if __name__ == '__main__':
raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72,
' this file is part of retr0chat',
@ -128,9 +129,9 @@ Text formatting:
\033[36mCTRL-O\033[0m reset text formatting
\033[36mCTRL-B\033[0m bold/bright text on/off
\033[36mCTRL-K\033[0m followed by a colour code:
\033[36m2\033[0m \033[32mgreen\033[0m,
\033[36m3,1\033[0m \033[33;41myellow on red\033[0m --
say \033[1m/cmap\033[0m to see all options
\033[36m2\033[0m \033[32mgreen\033[0m,
\033[36m3,1\033[0m \033[33;41myellow on red\033[0m --
say \033[1m/cmap\033[0m to see all options
Switching channels:
\033[36mCTRL-A\033[0m jump to previous channel
@ -208,6 +209,8 @@ if you are using a mac, PgUp is fn-Shift-PgUp
txt = u' message {0}\n mes {0}'.format(n)
self.world.send_chan_msg(self.nick, nchan, txt)
#self.client.refresh(False)
def exec_cmd(self, cmd_str):

View File

@ -22,7 +22,4 @@ SLOW_MOTION_TX = False
FORCE_LINEMODE = True
FORCE_LINEMODE = False
MSG_LEN = 8192
HEX_WIDTH = 16

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
if __name__ == '__main__':
raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72,
' this file is part of retr0chat',

View File

@ -1,9 +1,11 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
if __name__ == '__main__':
raise RuntimeError('\n{0}\n{1}\n{2}\n{0}\n'.format('*'*72,
' this file is part of retr0chat',
' run r0c.py instead'))
import traceback
import threading
import time
import sys
@ -12,6 +14,20 @@ from .config import *
PY2 = (sys.version_info[0] == 2)
print_mutex = threading.Lock()
if PY2:
import __builtin__
def print(*args, **kwargs):
with print_mutex:
#__builtin__.print("y")
__builtin__.print(*args, **kwargs)
else:
import builtins
def print(*args, **kwargs):
with print_mutex:
#builtins.print("y")
builtins.print(*args, **kwargs)
def fmt():
return time.strftime('%d/%m/%Y, %H:%M:%S')
@ -253,14 +269,48 @@ def visualize_all_unicode_codepoints_as_utf8():
class Printer(object):
def __init__(self):
self.mutex = threading.Lock()
def p(self, data):
with self.mutex:
#if len(data) < 13:
# data += ' ' * 13
sys.stdout.write('{0}\n'.format(data))
#sys.stdout.flush()
def whoops():
msg = """\
__
_ __/ /_ ____ ____ ____ _____
| | /| / / __ \/ __ \/ __ \/ __ \/ ___/
| |/ |/ / / / / /_/ / /_/ / /_/ (__ )
|__/|__/_/ /_/\____/\____/ .___/____/
/_/"""
exc = traceback.format_exc()
if exc.startswith('None'):
exc = ''.join(traceback.format_stack()[:-1])
msg = '{0}\n{1}\n{2}</stack>'.format(
msg, exc.rstrip(), '-'*64)
print(msg)
thread_monitor_enabled = False
def monitor_threads():
global thread_monitor_enabled
if thread_monitor_enabled:
return
thread_monitor_enabled = True
def t_a_a_bt():
ret = []
for tid, stack in sys._current_frames().items():
ret.append(u'\nThread {0} {1}'.format(tid, '='*64))
for fn, lno, func, line in traceback.extract_stack(stack):
ret.append(u' File "{0}", line {1}, in {2}'.format(fn, lno, func))
if line:
ret.append(u' {0}'.format(line.strip()))
return u'\n'.join(ret)
def stack_collector():
while True:
print('capturing stack')
time.sleep(5)
txt = t_a_a_bt()
with open('r0c.stack', 'wb') as f:
f.write(txt.encode('utf-8'))
thr = threading.Thread(target=stack_collector)
thr.daemon = True
thr.start()

29
test/mthread.py Normal file
View File

@ -0,0 +1,29 @@
# test case for multithreaded access to the print() builtin,
# the native print appears to have a mutex covering the main message
# but the 'end' variable is tacked on outside of it
# meaning you might lose the newline to a pending message
import threading
import time
nthr = 0
msg = '\n'.join('{0}a'.format(' '*(x*2)) for x in range(30))
p_mutex = threading.Lock()
def p(*args, **kwargs):
with p_mutex:
print(*args, **kwargs)
def worker():
global nthr
nthr += 1
while True:
p(nthr)
p(msg, end='1234567890\n')
for n in range(4):
thr = threading.Thread(target=worker)
thr.daemon = True
thr.start()
time.sleep(1)