From b84036a14154910617ae5b66888f50c3750b5d3c Mon Sep 17 00:00:00 2001 From: ed Date: Tue, 16 Jan 2018 20:05:36 +0100 Subject: [PATCH] fix single-byte codecs they would eat the inband control codes --- r0c/itelnet.py | 30 ++++++++++++++++++++---------- r0c/ivt100.py | 17 ++++++++++++----- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/r0c/itelnet.py b/r0c/itelnet.py index e4f319c..103aee3 100644 --- a/r0c/itelnet.py +++ b/r0c/itelnet.py @@ -184,37 +184,47 @@ class TelnetClient(VT100_Client): while self.in_bytes: len_at_start = len(self.in_bytes) - + decode_until = len_at_start + + # if the codec allows \xff as the 1st byte of a rune, + # make sure it doesn't consume it + if not self.inband_will_fail_decode: + ofs = self.in_bytes.find(b'\xff') + if ofs >= 0: + decode_until = ofs + try: - src = u'{0}'.format(self.in_bytes.decode(self.codec)) + src = u'{0}'.format(self.in_bytes[:decode_until].decode(self.codec)) #print('got {0} no prob'.format(src)) #print('got {0} runes: {1}'.format(len(src), # b2hex(src.encode('utf-8')))) - self.in_bytes = self.in_bytes[0:0] + self.in_bytes = self.in_bytes[decode_until:] except UnicodeDecodeError as uee: # first check whether the offending byte is an inband signal - if len(self.in_bytes) > uee.start and self.in_bytes[uee.start] == xff: + if decode_until > uee.start and self.in_bytes[uee.start] == xff: # it is, keep the text before it src = u'{0}'.format(self.in_bytes[:uee.start].decode(self.codec)) self.in_bytes = self.in_bytes[uee.start:] - elif len(self.in_bytes) < uee.start + 6 and self.codec != 'ascii': + elif decode_until < uee.start + 6 and not self.multibyte_codec: print('need more data to parse unicode codepoint at {0} in {1} ...probably'.format( - uee.start, len(self.in_bytes))) - hexdump(self.in_bytes[-8:], 'XXX ') + uee.start, decode_until)) + hexdump(self.in_bytes[decode_until-8:], 'XXX ') return else: # it can't be helped - print('warning: unparseable data:') + print('warning: unparseable data before {0} in {1} total:'.format( + decode_until, len(self.in_bytes))) + hexdump(self.in_bytes, 'XXX ') - src = u'{0}'.format(self.in_bytes[:uee.start].decode(self.codec, 'backslashreplace')) - self.in_bytes = self.in_bytes[0:0] # todo: is this correct? + src = u'{0}'.format(self.in_bytes[:decode_until].decode(self.codec, 'backslashreplace')) + self.in_bytes = self.in_bytes[decode_until:] #self.linebuf = self.linebuf[:self.linepos] + src + self.linebuf[self.linepos:] #self.linepos += len(src) diff --git a/r0c/ivt100.py b/r0c/ivt100.py index 21cc84d..658f560 100644 --- a/r0c/ivt100.py +++ b/r0c/ivt100.py @@ -103,7 +103,7 @@ class VT100_Client(asyncore.dispatcher): self.echo_on = False # set true by buffy clients self.vt100 = True # set nope by butty clients self.slowmo_tx = SLOW_MOTION_TX - self.codec = 'utf-8' + self.set_codec('utf-8') # outgoing data self.outbox = Queue() @@ -187,6 +187,13 @@ class VT100_Client(asyncore.dispatcher): + def set_codec(self, codec_name): + multibyte = ['utf-8','shift_jis'] + ff_illegal = ['utf-8','shift_jis'] + self.codec = codec_name + self.multibyte_codec = self.codec in multibyte + self.inband_will_fail_decode = self.codec in ff_illegal + def handshake_timeout(self): time.sleep(1) self.handshake_sz = True @@ -1155,18 +1162,18 @@ class VT100_Client(asyncore.dispatcher): self.linemode = True self.echo_on = True self.vt100 = False - self.codec = 'cp437' + self.set_codec('cp437') self.wizard_stage = 'end' # cheatcode: windows telnet + join elif self.in_text.startswith('wtn'): - self.codec = 'cp437' + self.set_codec('cp437') self.wizard_stage = 'end' join_ch = self.in_text[3:] # cheatcode: linux telnet + join elif self.in_text.startswith('ltn'): - self.codec = 'utf-8' + self.set_codec('utf-8') self.wizard_stage = 'end' join_ch = self.in_text[3:] @@ -1352,7 +1359,7 @@ class VT100_Client(asyncore.dispatcher): for n, letter in enumerate(AZ[:int(2+len(encs)/2)].lower()): if letter in text: self.wizard_stage = 'end' - self.codec = encs[n*2] + self.set_codec(encs[n*2]) break