#!/usr/bin/python3 import socket from hexdump import hexdump, dump from enum import Enum from collections import OrderedDict import logger import pkm import crystal # The states that the Mobile Adapter GB can be in class TransferState(Enum): Waiting = 0 # Waiting for the first byte of the preamble (0x99). Preamble = 1 # Expecting the second byte of the preamble (0x66). PacketStart = 2 # Expecting the packet start. Packet01 = 3 # Expecting packet offset 0x01 (unused?) Packet02 = 4 # Expecting packet offset 0x02 (unused?) PacketLen = 5 # Expecting the packet length. PacketBody = 6 # Expecting the packet body. Checksum1 = 7 # Expecting the first byte of the checksum. Checksum2 = 8 # Expecting the second byte of the checksum. DeviceID = 9 # Expecting the device ID. StatusByte = 10 # Expecting the status byte (0x00 for sender, 0x80 ^ packetID for receiver) # Main class for the communication class MobileAdapterGB: def __init__(self): # BGB Connection self.ip = '127.0.0.1' self.port = 8765 self.sock = None # Link Cable self.recv_count = 0 self.sent_count = 0 self.ticks_count = 0 self.frames_count = 0 # Mobile Adapter GB self.state = TransferState.Waiting self.is_sender = False self.packet_data = {'id': 0, 'size': 0, 'data': [], 'checksum': 0} self.line_busy = False self.ma_port = 0 self.http_ready = True self.pop_begun = False self.response_text = bytearray() # POP try: with open('email.txt','rb') as f: self.email = f.read() except FileNotFoundError: logger.log.critical('File email.txt not found!') # HTTP try: with open('index.html','rb') as f: self.http_ex = f.read() except FileNotFoundError: logger.log.critical('File index.html not found!') # Store the various responses to each path (the domain cannot be changed) self.http_text = bytearray() self.http_responses = {} # XXX self.http_responses[b'GET /cgb/download?name=/01/CGB-BXTJ/POKESTA/menu.cgb HTTP/1.0'] = { 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': b'FINDMEFINDME' } # Mobile Trainer Homepage, loaded from index.html self.http_responses[b'GET /01/CGB-B9AJ/index.html HTTP/1.0'] = { 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': self.http_ex } # Used by Pokemon Crystal to send Pokemon away to be traded self.http_responses[b'GET /cgb/download?name=/01/CGB-BXTJ/exchange/index.txt HTTP/1.0'] = { 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': ( b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi\r\n' b'http://gameboy.datacetner.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/cancel.cgi\r\n' )} # During upload, we have to return a 401. Later there's another request to this path with the Gb-Auth-ID # header set which is handled separately further down self.http_responses[b'GET /cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi HTTP/1.0'] = { 'response': b'HTTP/1.0 401 Unauthorized', 'headers': {'Gb-Auth-ID': b'HAIL GIOVANNI'}, 'content': b'' } # Just reply 200 to this request - part of the upload process for trading Pokemon self.http_responses[b'POST /cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi HTTP/1.0'] = { 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': b'' } # If something goes wrong we will get a POST to this path self.http_responses[b'POST /cgb/upload?name=/01/CGB-BXTJ/exchange/cancel.cgi HTTP/1.0'] = { 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': b'' } # XXX self.http_responses[b'GET /cgb/download?name=/01/CGB-BXTJ/tamago/index.txt HTTP/1.0'] = { 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': ( b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/tamago/tamagoXX.pkm\r\n' b'0ccc170a2e1447ad5eb778518ccca147b0a3bfffd1eae3d6f0a2ffff\r\n' )} # Odd Eggs odd_eggs = [] # Pichu odd_eggs.append( bytes.fromhex( 'AC 00 54 CC 92 00 00 08 00 00 7D 00 00 00 00 00' '00 00 00 00 00 00 00 1E 14 0A 00 14 00 00 00 05' '00 00 00 00 00 11 00 09 00 06 00 0B 00 08 00 08' '8F 9D 09 50 50 50') ) # XXX ??? odd_eggs.append( bytes.fromhex( 'AC 00 54 CC 92 00 00 01 00 00 7D 00 00 00 00 00' '00 00 00 00 00 2A AA 1E 14 0A 00 14 00 00 00 05' '00 00 00 00 00 11 00 09 00 07 00 0C 00 09 00 09' '8F 9D 09 50 50 50') ) # Cleffa odd_eggs.append( bytes.fromhex( 'AD 00 01 CC 92 00 00 10 00 00 7D 00 00 00 00 00' '00 00 00 00 00 00 00 23 14 0A 00 14 00 00 00 05' '00 00 00 00 00 14 00 07 00 07 00 06 00 09 00 0A' '8F 9D 09 50 50 50') ) # XXX ??? odd_eggs.append( bytes.fromhex( 'AD 00 01 CC 92 00 00 03 00 00 7D 00 00 00 00 00' '00 00 00 00 00 2A AA 23 14 0A 00 14 00 00 00 05' '00 00 00 00 00 14 00 07 00 08 00 07 00 0A 00 0B' '8F 9D 09 50 50 50') ) # Igglypugg odd_eggs.append( bytes.fromhex( 'AE 00 2F CC 92 00 00 10 00 00 7D 00 00 00 00 00' '00 00 00 00 00 00 00 0F 14 0A 00 14 00 00 00 05' '00 00 00 00 00 18 00 08 00 06 00 06 00 09 00 07' '8F 9D 09 50 50 50') ) # XXX ??? odd_eggs.append( bytes.fromhex( 'AE 00 2F CC 92 00 00 03 00 00 7D 00 00 00 00 00' '00 00 00 00 00 2A AA 0F 14 0A 00 14 00 00 00 05' '00 00 00 00 00 18 00 08 00 07 00 07 00 0A 00 08' '8F 9D 09 50 50 50') ) # Smoochum odd_eggs.append( bytes.fromhex( 'EE 00 01 7A 92 00 00 0E 00 00 7D 00 00 00 00 00' '00 00 00 00 00 00 00 23 1E 0A 00 14 00 00 00 05' '00 00 00 00 00 13 00 08 00 06 00 0B 00 0D 00 0B' '8F 9D 09 50 50 50') ) # XXX ??? odd_eggs.append( bytes.fromhex( 'EE 00 01 7A 92 00 00 02 00 00 7D 00 00 00 00 00' '00 00 00 00 00 2A AA 23 1E 0A 00 14 00 00 00 05' '00 00 00 00 00 13 00 08 00 07 00 0C 00 0E 00 0C' '8F 9D 09 50 50 50') ) # Magby odd_eggs.append( bytes.fromhex( 'F0 00 34 92 00 00 00 0A 00 00 7D 00 00 00 00 00' '00 00 00 00 00 00 00 19 0A 00 00 14 00 00 00 05' '00 00 00 00 00 13 00 0C 00 08 00 0D 00 0C 00 0A' '8F 9D 09 50 50 50') ) # XXX ??? odd_eggs.append( bytes.fromhex( 'F0 00 34 92 00 00 00 02 00 00 7D 00 00 00 00 00' '00 00 00 00 00 2A AA 19 0A 00 00 14 00 00 00 05' '00 00 00 00 00 13 00 0C 00 09 00 0E 00 0D 00 0B' '8F 9D 09 50 50 50') ) # Elekid odd_eggs.append( bytes.fromhex( 'EF 00 62 2B 92 00 00 0C 00 00 7D 00 00 00 00 00' '00 00 00 00 00 00 00 1E 1E 0A 00 14 00 00 00 05' '00 00 00 00 00 13 00 0B 00 08 00 0E 00 0B 00 0A' '8F 9D 09 50 50 50') ) # XXX ??? odd_eggs.append( bytes.fromhex( 'EF 00 62 2B 92 00 00 02 00 00 7D 00 00 00 00 00' '00 00 00 00 00 2A AA 1E 1E 0A 00 14 00 00 00 05' '00 00 00 00 00 13 00 0B 00 09 00 0F 00 0C 00 0B' '8F 9D 09 50 50 50') ) # Tyrogue odd_eggs.append( bytes.fromhex( 'EC 00 21 92 00 00 00 0A 00 00 7D 00 00 00 00 00' '00 00 00 00 00 00 00 23 0A 00 00 14 00 00 00 05' '00 00 00 00 00 12 00 08 00 08 00 08 00 08 00 08' '8F 9D 09 50 50 50') ) # XXX ??? odd_eggs.append( bytes.fromhex( 'EC 00 21 92 00 00 00 01 00 00 7D 00 00 00 00 00' '00 00 00 00 00 2A AA 23 0A 00 00 14 00 00 00 05' '00 00 00 00 00 12 00 08 00 09 00 09 00 09 00 09' '8F 9D 09 50 50 50') ) # XXX for egg in odd_eggs: new_url = 'GET /cgb/download?name=/01/CGB-BXTJ/tamago/tamago%02x.pkm HTTP/1.0' % odd_eggs.index(egg) self.http_responses[new_url.encode()] = { 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': egg } return # Where the business happens def main(self): self.connect() while True: try: recv = self.sock.recv( 1024 ) self.on_recv_cb(recv) except KeyboardInterrupt: exit() return # Connect locally to the BGB link cable listener def connect(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.sock.connect( (self.ip, self.port) ) except ConnectionRefusedError: logger.log.critical(f'Cannot connect to port {self.port}. Is BGB listening?') return # Hardcoded status indicates we're ready def status(self): status = [0x6a, 0, 0, 0, 0, 0, 0, 0] self.ticks_count += 1 self.frames_count += 8 status[2] = self.ticks_count % 256 status[3] = (self.ticks_count // 256) % 256 status[5] = self.frames_count % 256 status[6] = (self.frames_count // 256) % 256 status[7] = (self.frames_count // 256 // 256) % 256 return bytes(status) # Callback everytime we receive a packet def on_recv_cb(self, pkt): resp = None # Sometimes (on error?) we get an empty packet if len(pkt) == 0: return # Link Cable Response if pkt[0] == 0x01: logger.log.debug(f'[<] {dump(pkt)}') self.send_pkt(pkt) self.send_pkt(b'\x6c\x03\x00\x00\x00\x00\x00\x00') elif pkt[0] == 0x6c: logger.log.debug(f'[<] {dump(pkt)}') self.send_pkt(b'\x6c\x01\x00\x00\x00\x00\x00\x00') self.send_pkt(self.status()) elif pkt[0] == 0x65: logger.log.debug(f'[<] {dump(pkt)}') pass elif pkt[0] == 0x6a: self.send_pkt(self.status(), log=False) elif pkt[0] == 0x68 or pkt[0] == 0x69: logger.log.debug(f'[<] {dump(pkt)}') # Mobile Adapter Packet self.recv_count += 1 self.sent_count += 1 resp = list(pkt) resp[1] = self.mobile_adapter_byte(resp[1]) self.send_pkt(bytes(resp)) self.send_pkt(self.status()) else: logger.log.warning(f'UNHANDLED: {dump(pkt)}') return # Send the packet off to the GameBoy def send_pkt(self, pkt, log=True): if log == True: logger.log.debug(f'[>] {dump(pkt)}') self.sock.send(pkt) return # Deal with the data byte of each Mobile Adapter GB packet def mobile_adapter_byte(self, b): # We are sending if self.is_sender: # Waiting -> Preamble if self.state == TransferState.Waiting: self.update_state(TransferState.Preamble) return 0x99 # Preamble -> PacketStart elif self.state == TransferState.Preamble: self.update_state(TransferState.PacketStart) return 0x66 # PacketStart -> Packet01 elif self.state == TransferState.PacketStart: self.update_state(TransferState.Packet01) return self.packet_data['id'] # Packet01 -> Packet02 elif self.state == TransferState.Packet01: self.update_state(TransferState.Packet02) return 0x00 # Packet02 -> PacketLen elif self.state == TransferState.Packet02: self.update_state(TransferState.PacketLen) return 0x00 # PacketLen -> PacketBody # PacketLen -> Checksum1 elif self.state == TransferState.PacketLen: if self.packet_data['size'] > 0: self.update_state(TransferState.PacketBody) else: self.update_state(TransferState.Checksum1) return self.packet_data['size'] # PacketBody -> Checksum1 elif self.state == TransferState.PacketBody: self.packet_data['size'] -= 1 if self.packet_data['size'] == 0: self.update_state(TransferState.Checksum1) return self.packet_data['data'][-1 - self.packet_data['size']] # Checksum1 -> Checksum2 elif self.state == TransferState.Checksum1: self.update_state(TransferState.Checksum2) return self.packet_data['checksum'] >> 8 # Checksum2 -> DeviceID elif self.state == TransferState.Checksum2: self.update_state(TransferState.DeviceID) return self.packet_data['checksum'] & 0xff # DeviceID -> StatusByte elif self.state == TransferState.DeviceID: self.update_state(TransferState.StatusByte) """ ID: e800 / 9000 0x88: 0x1 / 0xfe 0x89: 0x2 / 0xfd 0x8a: 0x3 / 0xfc 0x8b: 0x4 / 0xfb 0x8c: 0x5 / 0xfa 0x8d: 0x6 / 0xf9 0x8e: 0x7 / 0xf8 0x8f: 0x8 / 0xf7 """ return 0x88 # StatusByte -> Waiting elif self.state == TransferState.StatusByte: self.update_state(TransferState.Waiting) self.is_sender = False return 0x00 # We are receiving else: # Waiting -> Preamble if self.state == TransferState.Waiting: if b == 0x99: self.update_state(TransferState.Preamble) # Reset Packet self.packet_data = {'id': 0, 'data': bytearray(), 'checksum': 0} # Preamble -> PacketStart # Preamble -> Waiting elif self.state == TransferState.Preamble: if b == 0x66: self.update_state(TransferState.PacketStart) else: self.update_state(TransferState.Waiting) return 0xf1 # PacketStart -> Packet01 elif self.state == TransferState.PacketStart: self.packet_data['id'] = b self.update_state(TransferState.Packet01) # Packet01 -> Packet02 elif self.state == TransferState.Packet01: self.update_state(TransferState.Packet02) # Packet02 -> PacketLen elif self.state == TransferState.Packet02: self.update_state(TransferState.PacketLen) # PacketLen -> PacketBody # PacketLen -> Checksum1 elif self.state == TransferState.PacketLen: self.packet_data['size'] = b if self.packet_data['size'] > 0: self.update_state(TransferState.PacketBody) else: self.update_state(TransferState.Checksum1) # PacketBody -> Checksum1 elif self.state == TransferState.PacketBody: self.packet_data['data'].append(b) self.packet_data['size'] -= 1 if self.packet_data['size'] == 0: self.update_state(TransferState.Checksum1) # Checksum1 -> Checksum2 elif self.state == TransferState.Checksum1: self.packet_data['checksum'] = b << 8 self.update_state(TransferState.Checksum2) # Checksum2 -> DeviceID elif self.state == TransferState.Checksum2: self.packet_data['checksum'] += b self.update_state(TransferState.DeviceID) # DeviceID -> StatusByte elif self.state == TransferState.DeviceID: self.update_state(TransferState.StatusByte) # StatusByte -> Waiting elif self.state == TransferState.StatusByte: self.update_state(TransferState.Waiting) self.is_sender = True return self.mobile_adapter_response() return 0x4b # Decide how to respond, parsing the full message as received def mobile_adapter_response(self): ret = 0x80 ^ self.packet_data['id'] # 0x10: BEGIN SESSION if self.packet_data['id'] == 0x10: logger.log.info(f'0x10: Opening Session ({self.packet_data["data"].decode()})') self.ma_port = 0 # Respond with the same data # 0x11: END SESSION elif self.packet_data['id'] == 0x11: logger.log.info(f'0x11: Closing Session') self.ma_port = 0 self.line_busy = False # Respond with the same data # 0x12: DIAL TELEPHONE elif self.packet_data['id'] == 0x12: logger.log.info(f'0x12: Dialling {self.packet_data["data"][1:].decode()}') # Respond with empty body self.packet_data['data'] = bytearray() self.line_busy = True # 0x13: HANG UP elif self.packet_data['id'] == 0x13: logger.log.info(f'0x13: Hanging Up') self.line_busy = False # Empty out anything left in response_text self.response_text = bytearray() # Respond with same data # 0x14: WAITING FOR CALL elif self.packet_data['id'] == 0x14: logger.log.info(f'0x13: Waiting for Call') # Respond with same data # 0x15: TRANSFER DATA elif self.packet_data['id'] == 0x15: # POP if self.ma_port == 110: if len(self.packet_data['data']) <= 1: logger.log.debug(f'0x15: No POP Traffic Received') else: logger.log.info(f'0x15: POP Recv: {self.packet_data["data"][1:-2].decode()}') self.packet_data['id'] = 0x95 # 0x80 ^ 0x15 self.packet_data['data'] = bytearray(b'\x00') + self.pop_response() if len(self.packet_data['data']) <= 1: logger.log.debug(f'0x95: No POP Traffic To Send') else: logger.log.info(f'0x95: POP Send: {self.packet_data["data"][1:-2].decode()}') # HTTP elif self.ma_port == 80: if len(self.packet_data['data']) <= 1: logger.log.debug(f'0x15: No HTTP Traffic Received') else: if self.packet_data['data'][-4:] == b'\r\n\r\n': logger.log.info(f'0x15: HTTP Recv: {self.http_text.decode()}') if self.http_ready or len(self.response_text) > 0: self.packet_data['id'] = 0x95 self.packet_data['data'] = bytearray(b'\x00') + self.http_response() if len(self.packet_data['data']) <= 1: logger.log.debug(f'0x95: No HTTP Traffic To Send') else: logger.log.info(f'0x95: HTTP Send: {len(self.packet_data["data"])} bytes') else: self.packet_data['id'] = 0x9f self.packet_data['data'] = bytearray(b'\x00') logger.log.info('0x95: HTTP Server Closed Connection') # Pokemon Crystal Battle elif self.ma_port == 0: self.packet_data = crystal.battle(self.packet_data).reply else: logger.log.warning(f'0x15: Unknown Protocol on port {self.ma_port}') logger.log.warning(f'Echoing data and hoping for the best!') # 0x17: TELEPHONE STATUS elif self.packet_data['id'] == 0x17: logger.log.info(f'0x17: Check Telephone Line') if self.line_busy: logger.log.info(f'0x17: Line Busy') # Setting the third byte to 0xf0 indicates that we're using the unlimited battle adapter self.packet_data['data'] = bytearray(b'\x05\x4d\xf0') else: logger.log.info(f'0x17: Line Free') # Setting the third byte to 0xf0 indicates that we're using the unlimited battle adapter self.packet_data['data'] = bytearray(b'\x00\x4d\xf0') # 0x19: READ CONFIGURATION DATA elif self.packet_data['id'] == 0x19: self.read_config() offset = self.packet_data['data'][0] length = self.packet_data['data'][1] logger.log.info(f'0x19: Read Config: {length} bytes @ {offset}') hexdump( self.config[offset: offset + length] ) self.packet_data['data'] = bytearray([offset]) + self.config[ offset: offset + length ] # 0x1a: WRITE CONFIGURATION DATA elif self.packet_data['id'] == 0x1a: offset = self.packet_data['data'][0] length = len(self.packet_data['data']) - 1 logger.log.info(f'0x1a: Write Config: {length} bytes @ {offset}') hexdump( self.packet_data['data'][1:] ) self.config[offset: offset+length] = self.packet_data['data'][1:] self.write_config() # Send empty response self.packet_data['data'] = bytearray() # 0x21: ISP LOGIN elif self.packet_data['id'] == 0x21: logger.log.info(f'0x21: Log in to DION') # Send empty response to signal success (lol) self.packet_data['data'] = bytearray(b'\x00') # 0x22: ISP LOGOUT elif self.packet_data['id'] == 0x22: logger.log.info(f'0x22: Log out of DION') self.ma_port = 0 # Respond with the same packet # 0x23: OPEN TCP CONNECTION elif self.packet_data['id'] == 0x23: self.ma_port = (self.packet_data['data'][4] << 8) + self.packet_data['data'][5] ip_string = f'{self.packet_data["data"][0]}.{self.packet_data["data"][1]}.{self.packet_data["data"][2]}.{self.packet_data["data"][3]}' logger.log.info(f'0x23: Open TCP Connection to {ip_string}:{self.ma_port}') # Respond with success self.packet_data['id'] = 0xa3 self.packet_data['data'] = bytearray(b'\xff') self.http_ready = True self.pop_begun = False # 0x24: CLOSE TCP CONNECTION elif self.packet_data['id'] == 0x24: logger.log.info(f'0x24: Close TCP Connection') self.ma_port = 0 self.response_text = bytearray() # Respond with the same packet # 0x28: DNS QUERY elif self.packet_data['id'] == 0x28: logger.log.info(f'0x28: DNS Query for {self.packet_data["data"].decode()}') self.packet_data['data'] = bytearray(b'\x13\x37\x13\x37\x00') # Hopefully we never hit this, but if we do, something interesting is probably going on else: logger.log.warning(f'UNKNOWN COMMAND: {hex(self.packet_data["id"])}') self.packet_data['size'] = len(self.packet_data['data']) checksum = self.packet_data['id'] + self.packet_data['size'] for byte in self.packet_data['data']: checksum += byte self.packet_data['checksum'] = checksum return ret # Build a valid POP3 response def pop_response(self): pop_text = bytearray() if len(self.response_text) == 0: if len(self.packet_data['data']) > 1: pop_text = self.packet_data['data'][1:] if pop_text.find(b'STAT') == 0 or pop_text.find(b'LIST 1') == 0: self.response_text += f'+OK 1 {len(self.email)}\r\n'.encode() elif pop_text.find(b'LIST ') == 0: self.response_text += b'-ERR\r\n' elif pop_text.find(b'LIST') == 0: self.response_text += f'+OK\r\n1 {len(self.email)}\r\n.\r\n'.encode() # Email headers only elif pop_text.find(b'TOP 1 0') == 0: self.response_text += b'+OK\r\n' + self.email.split(b'\r\n\r\n')[0] + b'\r\n\r\n.\r\n' elif pop_text.find(b'RETR 1') == 0: self.response_text += b'+OK\r\n' + self.email + b'\r\n.\r\n' # Reply +OK at the start of any session or to any other command elif len(pop_text) > 0 or not self.pop_begun: self.pop_begun = True self.response_text += b'+OK\r\n' # Something went wrong? else: logger.log.error('Got unhandled POP Command: {pop_text}') self.response_text += b'-ERR\r\n' bytes_to_send = min(254, len(self.response_text)) text_to_send = self.response_text[:bytes_to_send] self.response_text = self.response_text[bytes_to_send:] return text_to_send # Build a valid HTTP response def http_response(self): if len(self.response_text) == 0: if len(self.packet_data['data']) > 1: self.http_text += self.packet_data['data'][1:] http_data = self.parse_http(self.http_text) if 'request' in http_data: # If this is a POST, is it done or is there more data? if http_data['request'].find(b'POST') == 0: if 'Content-Length' in http_data['headers']: content_length = int(http_data['headers']['Content-Length']) # Request is done? if len(http_data['content']) >= content_length: self.http_ready = False # This is a GET, so we're definitely done else: self.http_ready = False if not self.http_ready: # Clear self.http_text before next request self.http_text = bytearray() response = self.http_responses.get( bytes(http_data['request']) ) if http_data['request'] == b'GET /cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi HTTP/1.0': if 'Gb-Auth-ID' in http_data['headers']: response = { 'response': b'HTTP/1.0 200 OK', 'headers': {'Gb-Auth-ID': b'HAIL GIOVANNI'}, 'content':b'\r\n' } # If we hit here, we got asked for something we don't know how to reply to. The game will probably # error out, so go try to implement what it asked for ;) if response is None: logger.log.warning(f'No response known for {http_data["request"].decode()}') self.response_text = b'HTTP/1.0 404 Not Found\r\n\r\n' else: self.response_text = response['response'] + b'\r\n' for header, value in response['headers'].items(): self.response_text += header.encode() + b': ' + value + b'\r\n' self.response_text += b'\r\n' + response['content'] # Can only send 254 bytes at a time bytes_to_send = min(254, len(self.response_text)) text_to_send = self.response_text[:bytes_to_send] self.response_text = self.response_text[bytes_to_send:] return text_to_send # Parse the incoming HTTP request def parse_http(self, recv): http_data = {} # Is this a complete request? if b'\r\n' in recv: http_data['request'] = recv.split(b'\r\n')[0] http_data['headers'] = {} # Deal with headers if recv.find(b'\r\n') < recv.find(b'\r\n\r\n'): headers = recv[ recv.find(b'\r\n') + 2 : recv.find(b'\r\n\r\n') ] headers = headers.split(b'\r\n') for header in headers: header = header.split(b': ') http_data['headers'][header[0].decode()] = header[1] http_data['content'] = recv[ recv.find(b'\r\n\r\n') + 4: ] # Assume we have a Pokemon request so print it nicely if http_data['request'].find(b'POST') == 0 and len(http_data['content']) > 0: hexdump(http_data['content']) from pprint import pprint pprint( pkm.parse_pokemon_request( http_data['content'] ).data ) return http_data # Update the current transfer state def update_state(self, new_state): logger.log.debug(f'State Update: [{self.state}] --> [{new_state}]') self.state = new_state return # Load the config file def read_config(self): try: with open('config.bin','rb') as f: self.config = bytearray(f.read()) except FileNotFoundError: with open('config.bin','wb') as f: self.config = b'\x00' * 192 f.write(self.config) return # Write out the config file it's changed def write_config(self): with open('config.bin','wb') as f: f.write(self.config) return # Parse the configuration file class parse_config: def __init__(self, fname): try: with open(fname, 'rb') as f: self.config = f.read() except FileNotFoundError: logger.log.critical(f'Error: {fname} not found!') if len(self.config) != 192: logger.log.critical(f'Error: config data not 192 bytes, have {len(self.config)}') self.parse() return def parse(self): self.data = OrderedDict() self.data['magic'] = self.config[0x0:0x0+2] self.data['reg'] = self.config[0x2:0x2+2] self.data['dns1'] = self.parse_ip( self.config[0x4:0x4+4] ) self.data['dns2'] = self.parse_ip( self.config[0x8:0x8+4] ) self.data['login'] = self.config[0xc:0xc+32] self.data['email'] = self.config[0x2c:0x2c+30] self.data['smtp'] = self.config[0x4a:0x4a+20] self.data['pop'] = self.config[0x5e:0x5e+24] self.data['conf1'] = self.config[0x76:0x76+24] self.data['conf2'] = self.config[0x8e:0x8e+24] self.data['conf3'] = self.config[0xa6:0xa6+24] self.data['checksum'] = self.config[0xbe:0xbe+2] if self.data['reg'] == b'\x01\x00': self.data['reg'] = 'NOT REGISTERED' elif self.data['reg'] == b'\x81\x00': self.data['reg'] = 'REGISTERED' self.data['login'] = self.data['login'].rstrip(b'\x00').decode() self.data['email'] = self.data['email'].rstrip(b'\x00').decode() self.data['smtp'] = self.data['smtp'].rstrip(b'\x00').decode() self.data['pop'] = self.data['pop'].rstrip(b'\x00').decode() self.data['conf1'] = self.parse_config_data( self.data['conf1'] ) self.data['conf2'] = self.parse_config_data( self.data['conf2'] ) self.data['conf3'] = self.parse_config_data( self.data['conf3'] ) sum = 0 for i in range(190): sum += self.config[i] sum = int.to_bytes(sum, 2, 'big') if sum != self.data['checksum']: logger.log.error(f'Checksum incorrect! Got {self.data["checksum"]}, should be {sum}') return # Render IP address as a string def parse_ip(self, b): return f'{b[0]}.{b[1]}.{b[2]}.{b[3]}' # Parse the data part of the configuration file def parse_config_data(self, b): tel = bytearray() for i in b[:8]: b1 = i >> 4 if b1 == 0xf: break tel += int.to_bytes(b1 + 0x30, 1, 'big') b2 = i & 0xf if b2 == 0xf: break tel += int.to_bytes(b2 + 0x30, 1, 'big') for i in range(len(tel)): if tel[i] == 0x3a: tel[i] = ord('#') return {'tel': tel.decode(), 'svc': b[8:].rstrip(b'\x00').decode()} def print(self): logger.log.info('Configuration Data:') logger.log.info(f'Registration State: {self.data["reg"]}') logger.log.info(f'Primary DNS: {self.data["dns1"]}') logger.log.info(f'Secondary DNS: {self.data["dns2"]}') logger.log.info(f'Login: {self.data["login"]}') logger.log.info(f'Email: {self.data["email"]}') logger.log.info(f'SMTP: {self.data["smtp"]}') logger.log.info(f'POP: {self.data["pop"]}') logger.log.info(f'Slot 1: {self.data["conf1"]["svc"]}: {self.data["conf1"]["tel"]}') logger.log.info(f'Slot 2: {self.data["conf2"]["svc"]}: {self.data["conf2"]["tel"]}') logger.log.info(f'Slot 3: {self.data["conf3"]["svc"]}: {self.data["conf3"]["tel"]}') return if __name__ == "__main__": #logger.log.setLevel(logger.logging.DEBUG) obj = MobileAdapterGB() obj.main()