tetsuji/mobile_adapter.py

825 lines
34 KiB
Python
Executable File

#!/usr/bin/python3
import socket
from hexdump import hexdump, dump
from enum import Enum
from collections import OrderedDict
import logger
import pkm
import crystal
# The states that the Mobile Adapter GB can be in
class TransferState(Enum):
Waiting = 0 # Waiting for the first byte of the preamble (0x99).
Preamble = 1 # Expecting the second byte of the preamble (0x66).
PacketStart = 2 # Expecting the packet start.
Packet01 = 3 # Expecting packet offset 0x01 (unused?)
Packet02 = 4 # Expecting packet offset 0x02 (unused?)
PacketLen = 5 # Expecting the packet length.
PacketBody = 6 # Expecting the packet body.
Checksum1 = 7 # Expecting the first byte of the checksum.
Checksum2 = 8 # Expecting the second byte of the checksum.
DeviceID = 9 # Expecting the device ID.
StatusByte = 10 # Expecting the status byte (0x00 for sender, 0x80 ^ packetID for receiver)
# Main class for the communication
class MobileAdapterGB:
def __init__(self):
# BGB Connection
self.ip = '127.0.0.1'
self.port = 8765
self.sock = None
# Link Cable
self.recv_count = 0
self.sent_count = 0
self.ticks_count = 0
self.frames_count = 0
# Mobile Adapter GB
self.state = TransferState.Waiting
self.is_sender = False
self.packet_data = {'id': 0, 'size': 0, 'data': [], 'checksum': 0}
self.line_busy = False
self.ma_port = 0
self.http_ready = True
self.pop_begun = False
self.response_text = bytearray()
# POP
try:
with open('email.txt','rb') as f:
self.email = f.read()
except FileNotFoundError:
logger.log.critical('File email.txt not found!')
# HTTP
try:
with open('index.html','rb') as f:
self.http_ex = f.read()
except FileNotFoundError:
logger.log.critical('File index.html not found!')
# Store the various responses to each path (the domain cannot be changed)
self.http_text = bytearray()
self.http_responses = {}
# XXX
self.http_responses[b'GET /cgb/download?name=/01/CGB-BXTJ/POKESTA/menu.cgb HTTP/1.0'] = {
'response': b'HTTP/1.0 200 OK',
'headers': {},
'content': b'FINDMEFINDME' }
# Mobile Trainer Homepage, loaded from index.html
self.http_responses[b'GET /01/CGB-B9AJ/index.html HTTP/1.0'] = {
'response': b'HTTP/1.0 200 OK',
'headers': {},
'content': self.http_ex }
# Used by Pokemon Crystal to send Pokemon away to be traded
self.http_responses[b'GET /cgb/download?name=/01/CGB-BXTJ/exchange/index.txt HTTP/1.0'] = {
'response': b'HTTP/1.0 200 OK',
'headers': {},
'content': (
b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi\r\n'
b'http://gameboy.datacetner.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/cancel.cgi\r\n'
)}
# During upload, we have to return a 401. Later there's another request to this path with the Gb-Auth-ID
# header set which is handled separately further down
self.http_responses[b'GET /cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi HTTP/1.0'] = {
'response': b'HTTP/1.0 401 Unauthorized',
'headers': {'Gb-Auth-ID': b'HAIL GIOVANNI'},
'content': b'' }
# Just reply 200 to this request - part of the upload process for trading Pokemon
self.http_responses[b'POST /cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi HTTP/1.0'] = {
'response': b'HTTP/1.0 200 OK',
'headers': {},
'content': b'' }
# If something goes wrong we will get a POST to this path
self.http_responses[b'POST /cgb/upload?name=/01/CGB-BXTJ/exchange/cancel.cgi HTTP/1.0'] = {
'response': b'HTTP/1.0 200 OK',
'headers': {},
'content': b'' }
# XXX
self.http_responses[b'GET /cgb/download?name=/01/CGB-BXTJ/tamago/index.txt HTTP/1.0'] = {
'response': b'HTTP/1.0 200 OK',
'headers': {},
'content': (
b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/tamago/tamagoXX.pkm\r\n'
b'0ccc170a2e1447ad5eb778518ccca147b0a3bfffd1eae3d6f0a2ffff\r\n'
)}
# Odd Eggs
odd_eggs = []
# Pichu
odd_eggs.append( bytes.fromhex( 'AC 00 54 CC 92 00 00 08 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 00 00 1E 14 0A 00 14 00 00 00 05'
'00 00 00 00 00 11 00 09 00 06 00 0B 00 08 00 08'
'8F 9D 09 50 50 50') )
# XXX ???
odd_eggs.append( bytes.fromhex( 'AC 00 54 CC 92 00 00 01 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 2A AA 1E 14 0A 00 14 00 00 00 05'
'00 00 00 00 00 11 00 09 00 07 00 0C 00 09 00 09'
'8F 9D 09 50 50 50') )
# Cleffa
odd_eggs.append( bytes.fromhex( 'AD 00 01 CC 92 00 00 10 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 00 00 23 14 0A 00 14 00 00 00 05'
'00 00 00 00 00 14 00 07 00 07 00 06 00 09 00 0A'
'8F 9D 09 50 50 50') )
# XXX ???
odd_eggs.append( bytes.fromhex( 'AD 00 01 CC 92 00 00 03 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 2A AA 23 14 0A 00 14 00 00 00 05'
'00 00 00 00 00 14 00 07 00 08 00 07 00 0A 00 0B'
'8F 9D 09 50 50 50') )
# Igglypugg
odd_eggs.append( bytes.fromhex( 'AE 00 2F CC 92 00 00 10 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 00 00 0F 14 0A 00 14 00 00 00 05'
'00 00 00 00 00 18 00 08 00 06 00 06 00 09 00 07'
'8F 9D 09 50 50 50') )
# XXX ???
odd_eggs.append( bytes.fromhex( 'AE 00 2F CC 92 00 00 03 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 2A AA 0F 14 0A 00 14 00 00 00 05'
'00 00 00 00 00 18 00 08 00 07 00 07 00 0A 00 08'
'8F 9D 09 50 50 50') )
# Smoochum
odd_eggs.append( bytes.fromhex( 'EE 00 01 7A 92 00 00 0E 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 00 00 23 1E 0A 00 14 00 00 00 05'
'00 00 00 00 00 13 00 08 00 06 00 0B 00 0D 00 0B'
'8F 9D 09 50 50 50') )
# XXX ???
odd_eggs.append( bytes.fromhex( 'EE 00 01 7A 92 00 00 02 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 2A AA 23 1E 0A 00 14 00 00 00 05'
'00 00 00 00 00 13 00 08 00 07 00 0C 00 0E 00 0C'
'8F 9D 09 50 50 50') )
# Magby
odd_eggs.append( bytes.fromhex( 'F0 00 34 92 00 00 00 0A 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 00 00 19 0A 00 00 14 00 00 00 05'
'00 00 00 00 00 13 00 0C 00 08 00 0D 00 0C 00 0A'
'8F 9D 09 50 50 50') )
# XXX ???
odd_eggs.append( bytes.fromhex( 'F0 00 34 92 00 00 00 02 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 2A AA 19 0A 00 00 14 00 00 00 05'
'00 00 00 00 00 13 00 0C 00 09 00 0E 00 0D 00 0B'
'8F 9D 09 50 50 50') )
# Elekid
odd_eggs.append( bytes.fromhex( 'EF 00 62 2B 92 00 00 0C 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 00 00 1E 1E 0A 00 14 00 00 00 05'
'00 00 00 00 00 13 00 0B 00 08 00 0E 00 0B 00 0A'
'8F 9D 09 50 50 50') )
# XXX ???
odd_eggs.append( bytes.fromhex( 'EF 00 62 2B 92 00 00 02 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 2A AA 1E 1E 0A 00 14 00 00 00 05'
'00 00 00 00 00 13 00 0B 00 09 00 0F 00 0C 00 0B'
'8F 9D 09 50 50 50') )
# Tyrogue
odd_eggs.append( bytes.fromhex( 'EC 00 21 92 00 00 00 0A 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 00 00 23 0A 00 00 14 00 00 00 05'
'00 00 00 00 00 12 00 08 00 08 00 08 00 08 00 08'
'8F 9D 09 50 50 50') )
# XXX ???
odd_eggs.append( bytes.fromhex( 'EC 00 21 92 00 00 00 01 00 00 7D 00 00 00 00 00'
'00 00 00 00 00 2A AA 23 0A 00 00 14 00 00 00 05'
'00 00 00 00 00 12 00 08 00 09 00 09 00 09 00 09'
'8F 9D 09 50 50 50') )
# XXX
for egg in odd_eggs:
new_url = 'GET /cgb/download?name=/01/CGB-BXTJ/tamago/tamago%02x.pkm HTTP/1.0' % odd_eggs.index(egg)
self.http_responses[new_url.encode()] = { 'response': b'HTTP/1.0 200 OK',
'headers': {},
'content': egg }
return
# Where the business happens
def main(self):
self.connect()
while True:
try:
recv = self.sock.recv( 1024 )
self.on_recv_cb(recv)
except KeyboardInterrupt:
exit()
return
# Connect locally to the BGB link cable listener
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect( (self.ip, self.port) )
except ConnectionRefusedError:
logger.log.critical(f'Cannot connect to port {self.port}. Is BGB listening?')
return
# Hardcoded status indicates we're ready
def status(self):
status = [0x6a, 0, 0, 0, 0, 0, 0, 0]
self.ticks_count += 1
self.frames_count += 8
status[2] = self.ticks_count % 256
status[3] = (self.ticks_count // 256) % 256
status[5] = self.frames_count % 256
status[6] = (self.frames_count // 256) % 256
status[7] = (self.frames_count // 256 // 256) % 256
return bytes(status)
# Callback everytime we receive a packet
def on_recv_cb(self, pkt):
resp = None
# Sometimes (on error?) we get an empty packet
if len(pkt) == 0:
return
# Link Cable Response
if pkt[0] == 0x01:
logger.log.debug(f'[<] {dump(pkt)}')
self.send_pkt(pkt)
self.send_pkt(b'\x6c\x03\x00\x00\x00\x00\x00\x00')
elif pkt[0] == 0x6c:
logger.log.debug(f'[<] {dump(pkt)}')
self.send_pkt(b'\x6c\x01\x00\x00\x00\x00\x00\x00')
self.send_pkt(self.status())
elif pkt[0] == 0x65:
logger.log.debug(f'[<] {dump(pkt)}')
pass
elif pkt[0] == 0x6a:
self.send_pkt(self.status(), log=False)
elif pkt[0] == 0x68 or pkt[0] == 0x69:
logger.log.debug(f'[<] {dump(pkt)}')
# Mobile Adapter Packet
self.recv_count += 1
self.sent_count += 1
resp = list(pkt)
resp[1] = self.mobile_adapter_byte(resp[1])
self.send_pkt(bytes(resp))
self.send_pkt(self.status())
else:
logger.log.warning(f'UNHANDLED: {dump(pkt)}')
return
# Send the packet off to the GameBoy
def send_pkt(self, pkt, log=True):
if log == True:
logger.log.debug(f'[>] {dump(pkt)}')
self.sock.send(pkt)
return
# Deal with the data byte of each Mobile Adapter GB packet
def mobile_adapter_byte(self, b):
# We are sending
if self.is_sender:
# Waiting -> Preamble
if self.state == TransferState.Waiting:
self.update_state(TransferState.Preamble)
return 0x99
# Preamble -> PacketStart
elif self.state == TransferState.Preamble:
self.update_state(TransferState.PacketStart)
return 0x66
# PacketStart -> Packet01
elif self.state == TransferState.PacketStart:
self.update_state(TransferState.Packet01)
return self.packet_data['id']
# Packet01 -> Packet02
elif self.state == TransferState.Packet01:
self.update_state(TransferState.Packet02)
return 0x00
# Packet02 -> PacketLen
elif self.state == TransferState.Packet02:
self.update_state(TransferState.PacketLen)
return 0x00
# PacketLen -> PacketBody
# PacketLen -> Checksum1
elif self.state == TransferState.PacketLen:
if self.packet_data['size'] > 0:
self.update_state(TransferState.PacketBody)
else:
self.update_state(TransferState.Checksum1)
return self.packet_data['size']
# PacketBody -> Checksum1
elif self.state == TransferState.PacketBody:
self.packet_data['size'] -= 1
if self.packet_data['size'] == 0:
self.update_state(TransferState.Checksum1)
return self.packet_data['data'][-1 - self.packet_data['size']]
# Checksum1 -> Checksum2
elif self.state == TransferState.Checksum1:
self.update_state(TransferState.Checksum2)
return self.packet_data['checksum'] >> 8
# Checksum2 -> DeviceID
elif self.state == TransferState.Checksum2:
self.update_state(TransferState.DeviceID)
return self.packet_data['checksum'] & 0xff
# DeviceID -> StatusByte
elif self.state == TransferState.DeviceID:
self.update_state(TransferState.StatusByte)
"""
ID: e800 / 9000
0x88: 0x1 / 0xfe
0x89: 0x2 / 0xfd
0x8a: 0x3 / 0xfc
0x8b: 0x4 / 0xfb
0x8c: 0x5 / 0xfa
0x8d: 0x6 / 0xf9
0x8e: 0x7 / 0xf8
0x8f: 0x8 / 0xf7
"""
return 0x88
# StatusByte -> Waiting
elif self.state == TransferState.StatusByte:
self.update_state(TransferState.Waiting)
self.is_sender = False
return 0x00
# We are receiving
else:
# Waiting -> Preamble
if self.state == TransferState.Waiting:
if b == 0x99:
self.update_state(TransferState.Preamble)
# Reset Packet
self.packet_data = {'id': 0, 'data': bytearray(), 'checksum': 0}
# Preamble -> PacketStart
# Preamble -> Waiting
elif self.state == TransferState.Preamble:
if b == 0x66:
self.update_state(TransferState.PacketStart)
else:
self.update_state(TransferState.Waiting)
return 0xf1
# PacketStart -> Packet01
elif self.state == TransferState.PacketStart:
self.packet_data['id'] = b
self.update_state(TransferState.Packet01)
# Packet01 -> Packet02
elif self.state == TransferState.Packet01:
self.update_state(TransferState.Packet02)
# Packet02 -> PacketLen
elif self.state == TransferState.Packet02:
self.update_state(TransferState.PacketLen)
# PacketLen -> PacketBody
# PacketLen -> Checksum1
elif self.state == TransferState.PacketLen:
self.packet_data['size'] = b
if self.packet_data['size'] > 0:
self.update_state(TransferState.PacketBody)
else:
self.update_state(TransferState.Checksum1)
# PacketBody -> Checksum1
elif self.state == TransferState.PacketBody:
self.packet_data['data'].append(b)
self.packet_data['size'] -= 1
if self.packet_data['size'] == 0:
self.update_state(TransferState.Checksum1)
# Checksum1 -> Checksum2
elif self.state == TransferState.Checksum1:
self.packet_data['checksum'] = b << 8
self.update_state(TransferState.Checksum2)
# Checksum2 -> DeviceID
elif self.state == TransferState.Checksum2:
self.packet_data['checksum'] += b
self.update_state(TransferState.DeviceID)
# DeviceID -> StatusByte
elif self.state == TransferState.DeviceID:
self.update_state(TransferState.StatusByte)
# StatusByte -> Waiting
elif self.state == TransferState.StatusByte:
self.update_state(TransferState.Waiting)
self.is_sender = True
return self.mobile_adapter_response()
return 0x4b
# Decide how to respond, parsing the full message as received
def mobile_adapter_response(self):
ret = 0x80 ^ self.packet_data['id']
# 0x10: BEGIN SESSION
if self.packet_data['id'] == 0x10:
logger.log.info(f'0x10: Opening Session ({self.packet_data["data"].decode()})')
self.ma_port = 0
# Respond with the same data
# 0x11: END SESSION
elif self.packet_data['id'] == 0x11:
logger.log.info(f'0x11: Closing Session')
self.ma_port = 0
self.line_busy = False
# Respond with the same data
# 0x12: DIAL TELEPHONE
elif self.packet_data['id'] == 0x12:
logger.log.info(f'0x12: Dialling {self.packet_data["data"][1:].decode()}')
# Respond with empty body
self.packet_data['data'] = bytearray()
self.line_busy = True
# 0x13: HANG UP
elif self.packet_data['id'] == 0x13:
logger.log.info(f'0x13: Hanging Up')
self.line_busy = False
# Empty out anything left in response_text
self.response_text = bytearray()
# Respond with same data
# 0x14: WAITING FOR CALL
elif self.packet_data['id'] == 0x14:
logger.log.info(f'0x13: Waiting for Call')
# Respond with same data
# 0x15: TRANSFER DATA
elif self.packet_data['id'] == 0x15:
# POP
if self.ma_port == 110:
if len(self.packet_data['data']) <= 1:
logger.log.debug(f'0x15: No POP Traffic Received')
else:
logger.log.info(f'0x15: POP Recv: {self.packet_data["data"][1:-2].decode()}')
self.packet_data['id'] = 0x95 # 0x80 ^ 0x15
self.packet_data['data'] = bytearray(b'\x00') + self.pop_response()
if len(self.packet_data['data']) <= 1:
logger.log.debug(f'0x95: No POP Traffic To Send')
else:
logger.log.info(f'0x95: POP Send: {self.packet_data["data"][1:-2].decode()}')
# HTTP
elif self.ma_port == 80:
if len(self.packet_data['data']) <= 1:
logger.log.debug(f'0x15: No HTTP Traffic Received')
else:
if self.packet_data['data'][-4:] == b'\r\n\r\n':
logger.log.info(f'0x15: HTTP Recv: {self.http_text.decode()}')
if self.http_ready or len(self.response_text) > 0:
self.packet_data['id'] = 0x95
self.packet_data['data'] = bytearray(b'\x00') + self.http_response()
if len(self.packet_data['data']) <= 1:
logger.log.debug(f'0x95: No HTTP Traffic To Send')
else:
logger.log.info(f'0x95: HTTP Send: {len(self.packet_data["data"])} bytes')
else:
self.packet_data['id'] = 0x9f
self.packet_data['data'] = bytearray(b'\x00')
logger.log.info('0x95: HTTP Server Closed Connection')
# Pokemon Crystal Battle
elif self.ma_port == 0:
self.packet_data = crystal.battle(self.packet_data).reply
else:
logger.log.warning(f'0x15: Unknown Protocol on port {self.ma_port}')
logger.log.warning(f'Echoing data and hoping for the best!')
# 0x17: TELEPHONE STATUS
elif self.packet_data['id'] == 0x17:
logger.log.info(f'0x17: Check Telephone Line')
if self.line_busy:
logger.log.info(f'0x17: Line Busy')
# Setting the third byte to 0xf0 indicates that we're using the unlimited battle adapter
self.packet_data['data'] = bytearray(b'\x05\x4d\xf0')
else:
logger.log.info(f'0x17: Line Free')
# Setting the third byte to 0xf0 indicates that we're using the unlimited battle adapter
self.packet_data['data'] = bytearray(b'\x00\x4d\xf0')
# 0x19: READ CONFIGURATION DATA
elif self.packet_data['id'] == 0x19:
self.read_config()
offset = self.packet_data['data'][0]
length = self.packet_data['data'][1]
logger.log.info(f'0x19: Read Config: {length} bytes @ {offset}')
hexdump( self.config[offset: offset + length] )
self.packet_data['data'] = bytearray([offset]) + self.config[ offset: offset + length ]
# 0x1a: WRITE CONFIGURATION DATA
elif self.packet_data['id'] == 0x1a:
offset = self.packet_data['data'][0]
length = len(self.packet_data['data']) - 1
logger.log.info(f'0x1a: Write Config: {length} bytes @ {offset}')
hexdump( self.packet_data['data'][1:] )
self.config[offset: offset+length] = self.packet_data['data'][1:]
self.write_config()
# Send empty response
self.packet_data['data'] = bytearray()
# 0x21: ISP LOGIN
elif self.packet_data['id'] == 0x21:
logger.log.info(f'0x21: Log in to DION')
# Send empty response to signal success (lol)
self.packet_data['data'] = bytearray(b'\x00')
# 0x22: ISP LOGOUT
elif self.packet_data['id'] == 0x22:
logger.log.info(f'0x22: Log out of DION')
self.ma_port = 0
# Respond with the same packet
# 0x23: OPEN TCP CONNECTION
elif self.packet_data['id'] == 0x23:
self.ma_port = (self.packet_data['data'][4] << 8) + self.packet_data['data'][5]
ip_string = f'{self.packet_data["data"][0]}.{self.packet_data["data"][1]}.{self.packet_data["data"][2]}.{self.packet_data["data"][3]}'
logger.log.info(f'0x23: Open TCP Connection to {ip_string}:{self.ma_port}')
# Respond with success
self.packet_data['id'] = 0xa3
self.packet_data['data'] = bytearray(b'\xff')
self.http_ready = True
self.pop_begun = False
# 0x24: CLOSE TCP CONNECTION
elif self.packet_data['id'] == 0x24:
logger.log.info(f'0x24: Close TCP Connection')
self.ma_port = 0
self.response_text = bytearray()
# Respond with the same packet
# 0x28: DNS QUERY
elif self.packet_data['id'] == 0x28:
logger.log.info(f'0x28: DNS Query for {self.packet_data["data"].decode()}')
self.packet_data['data'] = bytearray(b'\x13\x37\x13\x37\x00')
# Hopefully we never hit this, but if we do, something interesting is probably going on
else:
logger.log.warning(f'UNKNOWN COMMAND: {hex(self.packet_data["id"])}')
self.packet_data['size'] = len(self.packet_data['data'])
checksum = self.packet_data['id'] + self.packet_data['size']
for byte in self.packet_data['data']:
checksum += byte
self.packet_data['checksum'] = checksum
return ret
# Build a valid POP3 response
def pop_response(self):
pop_text = bytearray()
if len(self.response_text) == 0:
if len(self.packet_data['data']) > 1:
pop_text = self.packet_data['data'][1:]
if pop_text.find(b'STAT') == 0 or pop_text.find(b'LIST 1') == 0:
self.response_text += f'+OK 1 {len(self.email)}\r\n'.encode()
elif pop_text.find(b'LIST ') == 0:
self.response_text += b'-ERR\r\n'
elif pop_text.find(b'LIST') == 0:
self.response_text += f'+OK\r\n1 {len(self.email)}\r\n.\r\n'.encode()
# Email headers only
elif pop_text.find(b'TOP 1 0') == 0:
self.response_text += b'+OK\r\n' + self.email.split(b'\r\n\r\n')[0] + b'\r\n\r\n.\r\n'
elif pop_text.find(b'RETR 1') == 0:
self.response_text += b'+OK\r\n' + self.email + b'\r\n.\r\n'
# Reply +OK at the start of any session or to any other command
elif len(pop_text) > 0 or not self.pop_begun:
self.pop_begun = True
self.response_text += b'+OK\r\n'
# Something went wrong?
else:
logger.log.error('Got unhandled POP Command: {pop_text}')
self.response_text += b'-ERR\r\n'
bytes_to_send = min(254, len(self.response_text))
text_to_send = self.response_text[:bytes_to_send]
self.response_text = self.response_text[bytes_to_send:]
return text_to_send
# Build a valid HTTP response
def http_response(self):
if len(self.response_text) == 0:
if len(self.packet_data['data']) > 1:
self.http_text += self.packet_data['data'][1:]
http_data = self.parse_http(self.http_text)
if 'request' in http_data:
# If this is a POST, is it done or is there more data?
if http_data['request'].find(b'POST') == 0:
if 'Content-Length' in http_data['headers']:
content_length = int(http_data['headers']['Content-Length'])
# Request is done?
if len(http_data['content']) >= content_length:
self.http_ready = False
# This is a GET, so we're definitely done
else:
self.http_ready = False
if not self.http_ready:
# Clear self.http_text before next request
self.http_text = bytearray()
response = self.http_responses.get( bytes(http_data['request']) )
if http_data['request'] == b'GET /cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi HTTP/1.0':
if 'Gb-Auth-ID' in http_data['headers']:
response = { 'response': b'HTTP/1.0 200 OK',
'headers': {'Gb-Auth-ID': b'HAIL GIOVANNI'},
'content':b'\r\n' }
# If we hit here, we got asked for something we don't know how to reply to. The game will probably
# error out, so go try to implement what it asked for ;)
if response is None:
logger.log.warning(f'No response known for {http_data["request"].decode()}')
self.response_text = b'HTTP/1.0 404 Not Found\r\n\r\n'
else:
self.response_text = response['response'] + b'\r\n'
for header, value in response['headers'].items():
self.response_text += header.encode() + b': ' + value + b'\r\n'
self.response_text += b'\r\n' + response['content']
# Can only send 254 bytes at a time
bytes_to_send = min(254, len(self.response_text))
text_to_send = self.response_text[:bytes_to_send]
self.response_text = self.response_text[bytes_to_send:]
return text_to_send
# Parse the incoming HTTP request
def parse_http(self, recv):
http_data = {}
# Is this a complete request?
if b'\r\n' in recv:
http_data['request'] = recv.split(b'\r\n')[0]
http_data['headers'] = {}
# Deal with headers
if recv.find(b'\r\n') < recv.find(b'\r\n\r\n'):
headers = recv[ recv.find(b'\r\n') + 2 : recv.find(b'\r\n\r\n') ]
headers = headers.split(b'\r\n')
for header in headers:
header = header.split(b': ')
http_data['headers'][header[0].decode()] = header[1]
http_data['content'] = recv[ recv.find(b'\r\n\r\n') + 4: ]
# Assume we have a Pokemon request so print it nicely
if http_data['request'].find(b'POST') == 0 and len(http_data['content']) > 0:
hexdump(http_data['content'])
from pprint import pprint
pprint( pkm.parse_pokemon_request( http_data['content'] ).data )
return http_data
# Update the current transfer state
def update_state(self, new_state):
logger.log.debug(f'State Update: [{self.state}] --> [{new_state}]')
self.state = new_state
return
# Load the config file
def read_config(self):
try:
with open('config.bin','rb') as f:
self.config = bytearray(f.read())
except FileNotFoundError:
with open('config.bin','wb') as f:
self.config = b'\x00' * 192
f.write(self.config)
return
# Write out the config file it's changed
def write_config(self):
with open('config.bin','wb') as f:
f.write(self.config)
return
# Parse the configuration file
class parse_config:
def __init__(self, fname):
try:
with open(fname, 'rb') as f:
self.config = f.read()
except FileNotFoundError:
logger.log.critical(f'Error: {fname} not found!')
if len(self.config) != 192:
logger.log.critical(f'Error: config data not 192 bytes, have {len(self.config)}')
self.parse()
return
def parse(self):
self.data = OrderedDict()
self.data['magic'] = self.config[0x0:0x0+2]
self.data['reg'] = self.config[0x2:0x2+2]
self.data['dns1'] = self.parse_ip( self.config[0x4:0x4+4] )
self.data['dns2'] = self.parse_ip( self.config[0x8:0x8+4] )
self.data['login'] = self.config[0xc:0xc+32]
self.data['email'] = self.config[0x2c:0x2c+30]
self.data['smtp'] = self.config[0x4a:0x4a+20]
self.data['pop'] = self.config[0x5e:0x5e+24]
self.data['conf1'] = self.config[0x76:0x76+24]
self.data['conf2'] = self.config[0x8e:0x8e+24]
self.data['conf3'] = self.config[0xa6:0xa6+24]
self.data['checksum'] = self.config[0xbe:0xbe+2]
if self.data['reg'] == b'\x01\x00':
self.data['reg'] = 'NOT REGISTERED'
elif self.data['reg'] == b'\x81\x00':
self.data['reg'] = 'REGISTERED'
self.data['login'] = self.data['login'].rstrip(b'\x00').decode()
self.data['email'] = self.data['email'].rstrip(b'\x00').decode()
self.data['smtp'] = self.data['smtp'].rstrip(b'\x00').decode()
self.data['pop'] = self.data['pop'].rstrip(b'\x00').decode()
self.data['conf1'] = self.parse_config_data( self.data['conf1'] )
self.data['conf2'] = self.parse_config_data( self.data['conf2'] )
self.data['conf3'] = self.parse_config_data( self.data['conf3'] )
sum = 0
for i in range(190):
sum += self.config[i]
sum = int.to_bytes(sum, 2, 'big')
if sum != self.data['checksum']:
logger.log.error(f'Checksum incorrect! Got {self.data["checksum"]}, should be {sum}')
return
# Render IP address as a string
def parse_ip(self, b):
return f'{b[0]}.{b[1]}.{b[2]}.{b[3]}'
# Parse the data part of the configuration file
def parse_config_data(self, b):
tel = bytearray()
for i in b[:8]:
b1 = i >> 4
if b1 == 0xf:
break
tel += int.to_bytes(b1 + 0x30, 1, 'big')
b2 = i & 0xf
if b2 == 0xf:
break
tel += int.to_bytes(b2 + 0x30, 1, 'big')
for i in range(len(tel)):
if tel[i] == 0x3a:
tel[i] = ord('#')
return {'tel': tel.decode(), 'svc': b[8:].rstrip(b'\x00').decode()}
def print(self):
logger.log.info('Configuration Data:')
logger.log.info(f'Registration State: {self.data["reg"]}')
logger.log.info(f'Primary DNS: {self.data["dns1"]}')
logger.log.info(f'Secondary DNS: {self.data["dns2"]}')
logger.log.info(f'Login: {self.data["login"]}')
logger.log.info(f'Email: {self.data["email"]}')
logger.log.info(f'SMTP: {self.data["smtp"]}')
logger.log.info(f'POP: {self.data["pop"]}')
logger.log.info(f'Slot 1: {self.data["conf1"]["svc"]}: {self.data["conf1"]["tel"]}')
logger.log.info(f'Slot 2: {self.data["conf2"]["svc"]}: {self.data["conf2"]["tel"]}')
logger.log.info(f'Slot 3: {self.data["conf3"]["svc"]}: {self.data["conf3"]["tel"]}')
return
if __name__ == "__main__":
#logger.log.setLevel(logger.logging.DEBUG)
obj = MobileAdapterGB()
obj.main()