spaCy/tests/serialize/test_huffman.py

107 lines
3.2 KiB
Python
Raw Normal View History

2015-07-13 10:58:07 +00:00
from __future__ import unicode_literals
from __future__ import division
import pytest
from spacy.serialize.huffman import HuffmanCodec
from spacy.serialize.bits import BitArray
2015-07-13 10:58:07 +00:00
import numpy
2015-07-19 15:59:51 +00:00
import math
2015-07-13 10:58:07 +00:00
from heapq import heappush, heappop, heapify
from collections import defaultdict
def py_encode(symb2freq):
"""Huffman encode the given dict mapping symbols to weights
From Rosetta Code
"""
heap = [[wt, [sym, ""]] for sym, wt in symb2freq.items()]
heapify(heap)
while len(heap) > 1:
lo = heappop(heap)
hi = heappop(heap)
for pair in lo[1:]:
pair[1] = '0' + pair[1]
for pair in hi[1:]:
pair[1] = '1' + pair[1]
heappush(heap, [lo[0] + hi[0]] + lo[1:] + hi[1:])
return dict(heappop(heap)[1:])
def test1():
probs = numpy.zeros(shape=(10,), dtype=numpy.float32)
probs[0] = 0.3
probs[1] = 0.2
probs[2] = 0.15
probs[3] = 0.1
probs[4] = 0.06
probs[5] = 0.02
probs[6] = 0.01
probs[7] = 0.005
probs[8] = 0.0001
probs[9] = 0.000001
2015-07-19 15:59:51 +00:00
codec = HuffmanCodec(list(enumerate(probs)))
2015-07-13 10:58:07 +00:00
py_codes = py_encode(dict(enumerate(probs)))
2015-07-24 01:47:59 +00:00
py_codes = list(py_codes.items())
2015-07-13 10:58:07 +00:00
py_codes.sort()
assert codec.strings == [c for i, c in py_codes]
def test_round_trip():
freqs = {'the': 10, 'quick': 3, 'brown': 4, 'fox': 1, 'jumped': 5, 'over': 8,
'lazy': 1, 'dog': 2, '.': 9}
2015-07-19 15:59:51 +00:00
codec = HuffmanCodec(freqs.items())
2015-07-13 10:58:07 +00:00
message = ['the', 'quick', 'brown', 'fox', 'jumped', 'over', 'the',
'the', 'lazy', 'dog', '.']
2015-07-19 15:59:51 +00:00
strings = list(codec.strings)
codes = {codec.leaves[i]: strings[i] for i in range(len(codec.leaves))}
bits = codec.encode(message)
2015-07-24 01:47:59 +00:00
string = ''.join('{0:b}'.format(c).rjust(8, '0')[::-1] for c in bits.as_bytes())
2015-07-13 10:58:07 +00:00
for word in message:
code = codes[word]
assert string[:len(code)] == code
string = string[len(code):]
2015-07-19 15:59:51 +00:00
unpacked = [0] * len(message)
bits.seek(0)
codec.decode(bits, unpacked)
2015-07-13 10:58:07 +00:00
assert message == unpacked
def test_rosetta():
txt = u"this is an example for huffman encoding"
symb2freq = defaultdict(int)
for ch in txt:
symb2freq[ch] += 1
2015-07-24 01:47:59 +00:00
by_freq = list(symb2freq.items())
2015-07-13 10:58:07 +00:00
by_freq.sort(reverse=True, key=lambda item: item[1])
symbols = [sym for sym, prob in by_freq]
2015-07-19 15:59:51 +00:00
codec = HuffmanCodec(symb2freq.items())
2015-07-13 10:58:07 +00:00
py_codec = py_encode(symb2freq)
2015-07-19 15:59:51 +00:00
codes = {codec.leaves[i]: codec.strings[i] for i in range(len(codec.leaves))}
2015-07-13 10:58:07 +00:00
my_lengths = defaultdict(int)
py_lengths = defaultdict(int)
2015-07-19 15:59:51 +00:00
for symb, freq in symb2freq.items():
my = codes[symb]
my_lengths[len(my)] += freq
py_lengths[len(py_codec[symb])] += freq
2015-07-13 10:58:07 +00:00
my_exp_len = sum(length * weight for length, weight in my_lengths.items())
py_exp_len = sum(length * weight for length, weight in py_lengths.items())
assert my_exp_len == py_exp_len
2015-07-22 23:19:11 +00:00
@pytest.mark.slow
2015-07-13 10:58:07 +00:00
def test_vocab(EN):
2015-07-19 15:59:51 +00:00
codec = HuffmanCodec([(w.orth, numpy.exp(w.prob)) for w in EN.vocab])
2015-07-13 10:58:07 +00:00
expected_length = 0
for i, code in enumerate(codec.strings):
2015-07-19 15:59:51 +00:00
leaf = codec.leaves[i]
expected_length += len(code) * numpy.exp(EN.vocab[leaf].prob)
2015-07-13 10:58:07 +00:00
assert 8 < expected_length < 15