pokecrystal/tools/pokemontools/lz.py

581 lines
16 KiB
Python

# -*- coding: utf-8 -*-
"""
Pokemon Crystal data de/compression.
"""
"""
A rundown of Pokemon Crystal's compression scheme:
Control commands occupy bits 5-7.
Bits 0-4 serve as the first parameter <n> for each command.
"""
lz_commands = {
'literal': 0, # n values for n bytes
'iterate': 1, # one value for n bytes
'alternate': 2, # alternate two values for n bytes
'blank': 3, # zero for n bytes
}
"""
Repeater commands repeat any data that was just decompressed.
They take an additional signed parameter <s> to mark a relative starting point.
These wrap around (positive from the start, negative from the current position).
"""
lz_commands.update({
'repeat': 4, # n bytes starting from s
'flip': 5, # n bytes in reverse bit order starting from s
'reverse': 6, # n bytes backwards starting from s
})
"""
The long command is used when 5 bits aren't enough. Bits 2-4 contain a new control code.
Bits 0-1 are appended to a new byte as 8-9, allowing a 10-bit parameter.
"""
lz_commands.update({
'long': 7, # n is now 10 bits for a new control code
})
max_length = 1 << 10 # can't go higher than 10 bits
lowmax = 1 << 5 # standard 5-bit param
"""
If 0xff is encountered instead of a command, decompression ends.
"""
lz_end = 0xff
bit_flipped = [
sum(((byte >> i) & 1) << (7 - i) for i in xrange(8))
for byte in xrange(0x100)
]
class Compressed:
"""
Usage:
lz = Compressed(data).output
or
lz = Compressed().compress(data)
or
c = Compressed()
c.data = data
lz = c.compress()
There are some issues with reproducing the target compressor.
Some notes are listed here:
- the criteria for detecting a lookback is inconsistent
- sometimes lookbacks that are mostly 0s are pruned, sometimes not
- target appears to skip ahead if it can use a lookback soon, stopping the current command short or in some cases truncating it with literals.
- this has been implemented, but the specifics are unknown
- self.min_scores: It's unknown if blank's minimum score should be 1 or 2. Most likely it's 1, with some other hack to account for edge cases.
- may be related to the above
- target does not appear to compress backwards
"""
def __init__(self, *args, **kwargs):
self.min_scores = {
'blank': 1,
'iterate': 2,
'alternate': 3,
'repeat': 3,
'reverse': 3,
'flip': 3,
}
self.preference = [
'repeat',
'blank',
'flip',
'reverse',
'iterate',
'alternate',
#'literal',
]
self.lookback_methods = 'repeat', 'reverse', 'flip'
self.__dict__.update({
'data': None,
'commands': lz_commands,
'debug': False,
'literal_only': False,
})
self.arg_names = 'data', 'commands', 'debug', 'literal_only'
self.__dict__.update(kwargs)
self.__dict__.update(dict(zip(self.arg_names, args)))
if self.data is not None:
self.compress()
def compress(self, data=None):
if data is not None:
self.data = data
self.data = list(bytearray(self.data))
self.indexes = {}
self.lookbacks = {}
for method in self.lookback_methods:
self.lookbacks[method] = {}
self.address = 0
self.end = len(self.data)
self.output = []
self.literal = None
while self.address < self.end:
if self.score():
self.do_literal()
self.do_winner()
else:
if self.literal == None:
self.literal = self.address
self.address += 1
self.do_literal()
self.output += [lz_end]
return self.output
def reset_scores(self):
self.scores = {}
self.offsets = {}
self.helpers = {}
for method in self.min_scores.iterkeys():
self.scores[method] = 0
def bit_flip(self, byte):
return bit_flipped[byte]
def do_literal(self):
if self.literal != None:
length = abs(self.address - self.literal)
start = min(self.literal, self.address + 1)
self.helpers['literal'] = self.data[start:start+length]
self.do_cmd('literal', length)
self.literal = None
def score(self):
self.reset_scores()
map(self.score_literal, ['iterate', 'alternate', 'blank'])
for method in self.lookback_methods:
self.scores[method], self.offsets[method] = self.find_lookback(method, self.address)
self.stop_short()
return any(
score
> self.min_scores[method] + int(score > lowmax)
for method, score in self.scores.iteritems()
)
def stop_short(self):
"""
If a lookback is close, reduce the scores of other commands.
"""
best_method, best_score = max(
self.scores.items(),
key = lambda x: (
x[1],
-self.preference.index(x[0])
)
)
for method in self.lookback_methods:
min_score = self.min_scores[method]
for address in xrange(self.address+1, self.address+best_score):
length, index = self.find_lookback(method, address)
if length > max(min_score, best_score):
# BUG: lookbacks can reduce themselves. This appears to be a bug in the target also.
for m, score in self.scores.items():
self.scores[m] = min(score, address - self.address)
def read(self, address=None):
if address is None:
address = self.address
if 0 <= address < len(self.data):
return self.data[address]
return None
def find_all_lookbacks(self):
for method in self.lookback_methods:
for address, byte in enumerate(self.data):
self.find_lookback(method, address)
def find_lookback(self, method, address=None):
"""Temporarily stubbed, because the real function doesn't run in polynomial time."""
return 0, None
def broken_find_lookback(self, method, address=None):
if address is None:
address = self.address
existing = self.lookbacks.get(method, {}).get(address)
if existing != None:
return existing
lookback = 0, None
# Better to not carelessly optimize at the moment.
"""
if address < 2:
return lookback
"""
byte = self.read(address)
if byte is None:
return lookback
direction, mutate = {
'repeat': ( 1, int),
'reverse': (-1, int),
'flip': ( 1, self.bit_flip),
}[method]
# Doesn't seem to help
"""
if mutate == self.bit_flip:
if byte == 0:
self.lookbacks[method][address] = lookback
return lookback
"""
data_len = len(self.data)
is_two_byte_index = lambda index: int(index < address - 0x7f)
for index in self.get_indexes(mutate(byte)):
if index >= address:
break
old_length, old_index = lookback
if direction == 1:
if old_length > data_len - index: break
else:
if old_length > index: continue
if self.read(index) in [None]: continue
length = 1 # we know there's at least one match, or we wouldn't be checking this index
while 1:
this_byte = self.read(address + length)
that_byte = self.read(index + length * direction)
if that_byte == None or this_byte != mutate(that_byte):
break
length += 1
score = length - is_two_byte_index(index)
old_score = old_length - is_two_byte_index(old_index)
if score >= old_score or (score == old_score and length > old_length):
# XXX maybe avoid two-byte indexes when possible
if score >= lookback[0] - is_two_byte_index(lookback[1]):
lookback = length, index
self.lookbacks[method][address] = lookback
return lookback
def get_indexes(self, byte):
if not self.indexes.has_key(byte):
self.indexes[byte] = []
index = -1
while 1:
try:
index = self.data.index(byte, index + 1)
except ValueError:
break
self.indexes[byte].append(index)
return self.indexes[byte]
def score_literal(self, method):
address = self.address
compare = {
'blank': [0],
'iterate': [self.read(address)],
'alternate': [self.read(address), self.read(address + 1)],
}[method]
# XXX may or may not be correct
if method == 'alternate' and compare[0] == 0:
return
length = 0
while self.read(address + length) == compare[length % len(compare)]:
length += 1
self.scores[method] = length
self.helpers[method] = compare
def do_winner(self):
winners = filter(
lambda (method, score):
score
> self.min_scores[method] + int(score > lowmax),
self.scores.iteritems()
)
winners.sort(
key = lambda (method, score): (
-(score - self.min_scores[method] - int(score > lowmax)),
self.preference.index(method)
)
)
winner, score = winners[0]
length = min(score, max_length)
self.do_cmd(winner, length)
self.address += length
def do_cmd(self, cmd, length):
start_address = self.address
cmd_length = length - 1
output = []
if length > lowmax:
output.append(
(self.commands['long'] << 5)
+ (self.commands[cmd] << 2)
+ (cmd_length >> 8)
)
output.append(
cmd_length & 0xff
)
else:
output.append(
(self.commands[cmd] << 5)
+ cmd_length
)
self.helpers['blank'] = [] # quick hack
output += self.helpers.get(cmd, [])
if cmd in self.lookback_methods:
offset = self.offsets[cmd]
# Negative offsets are one byte.
# Positive offsets are two.
if 0 < start_address - offset - 1 <= 0x7f:
offset = (start_address - offset - 1) | 0x80
output += [offset]
else:
output += [offset / 0x100, offset % 0x100] # big endian
if self.debug:
print ' '.join(map(str, [
cmd, length, '\t',
' '.join(map('{:02x}'.format, output)),
self.data[start_address:start_address+length] if cmd in self.lookback_methods else '',
]))
self.output += output
class Decompressed:
"""
Interpret and decompress lz-compressed data, usually 2bpp.
"""
"""
Usage:
data = Decompressed(lz).output
or
data = Decompressed().decompress(lz)
or
d = Decompressed()
d.lz = lz
data = d.decompress()
To decompress from offset 0x80000 in a rom:
data = Decompressed(rom, start=0x80000).output
"""
lz = None
start = 0
commands = lz_commands
debug = False
arg_names = 'lz', 'start', 'commands', 'debug'
def __init__(self, *args, **kwargs):
self.__dict__.update(dict(zip(self.arg_names, args)))
self.__dict__.update(kwargs)
self.command_names = dict(map(reversed, self.commands.items()))
self.address = self.start
if self.lz is not None:
self.decompress()
if self.debug: print self.command_list()
def command_list(self):
"""
Print a list of commands that were used. Useful for debugging.
"""
text = ''
output_address = 0
for name, attrs in self.used_commands:
length = attrs['length']
address = attrs['address']
offset = attrs['offset']
direction = attrs['direction']
text += '{2:03x} {0}: {1}'.format(name, length, output_address)
text += '\t' + ' '.join(
'{:02x}'.format(int(byte))
for byte in self.lz[ address : address + attrs['cmd_length'] ]
)
if offset is not None:
repeated_data = self.output[ offset : offset + length * direction : direction ]
if name == 'flip':
repeated_data = map(bit_flipped.__getitem__, repeated_data)
text += ' [' + ' '.join(map('{:02x}'.format, repeated_data)) + ']'
text += '\n'
output_address += length
return text
def decompress(self, lz=None):
if lz is not None:
self.lz = lz
self.lz = bytearray(self.lz)
self.used_commands = []
self.output = []
while 1:
cmd_address = self.address
self.offset = None
self.direction = None
if (self.byte == lz_end):
self.next()
break
self.cmd = (self.byte & 0b11100000) >> 5
if self.cmd_name == 'long':
# 10-bit length
self.cmd = (self.byte & 0b00011100) >> 2
self.length = (self.next() & 0b00000011) * 0x100
self.length += self.next() + 1
else:
# 5-bit length
self.length = (self.next() & 0b00011111) + 1
self.__class__.__dict__[self.cmd_name](self)
self.used_commands += [(
self.cmd_name,
{
'length': self.length,
'address': cmd_address,
'offset': self.offset,
'cmd_length': self.address - cmd_address,
'direction': self.direction,
}
)]
# Keep track of the data we just decompressed.
self.compressed_data = self.lz[self.start : self.address]
@property
def byte(self):
return self.lz[ self.address ]
def next(self):
byte = self.byte
self.address += 1
return byte
@property
def cmd_name(self):
return self.command_names.get(self.cmd)
def get_offset(self):
if self.byte >= 0x80: # negative
# negative
offset = self.next() & 0x7f
offset = len(self.output) - offset - 1
else:
# positive
offset = self.next() * 0x100
offset += self.next()
self.offset = offset
def literal(self):
"""
Copy data directly.
"""
self.output += self.lz[ self.address : self.address + self.length ]
self.address += self.length
def iterate(self):
"""
Write one byte repeatedly.
"""
self.output += [self.next()] * self.length
def alternate(self):
"""
Write alternating bytes.
"""
alts = [self.next(), self.next()]
self.output += [ alts[x & 1] for x in xrange(self.length) ]
def blank(self):
"""
Write zeros.
"""
self.output += [0] * self.length
def flip(self):
"""
Repeat flipped bytes from output.
Example: 11100100 -> 00100111
"""
self._repeat(table=bit_flipped)
def reverse(self):
"""
Repeat reversed bytes from output.
"""
self._repeat(direction=-1)
def repeat(self):
"""
Repeat bytes from output.
"""
self._repeat()
def _repeat(self, direction=1, table=None):
self.get_offset()
self.direction = direction
# Note: appends must be one at a time (this way, repeats can draw from themselves if required)
for i in xrange(self.length):
byte = self.output[ self.offset + i * direction ]
self.output.append( table[byte] if table else byte )