201 lines
6.4 KiB
Python
201 lines
6.4 KiB
Python
from __future__ import print_function
|
|
|
|
import collections
|
|
import re
|
|
import sys
|
|
|
|
import gzip
|
|
import zlib
|
|
|
|
|
|
_COMPRESSED_MARKER = 0xFF
|
|
|
|
|
|
def check_non_ascii(msg):
|
|
for c in msg:
|
|
if ord(c) >= 0x80:
|
|
print(
|
|
'Unable to generate compressed data: message "{}" contains a non-ascii character "{}".'.format(
|
|
msg, c
|
|
),
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
|
|
# Replace <char><space> with <char | 0x80>.
|
|
# Trival scheme to demo/test.
|
|
def space_compression(error_strings):
|
|
for line in error_strings:
|
|
check_non_ascii(line)
|
|
result = ""
|
|
for i in range(len(line)):
|
|
if i > 0 and line[i] == " ":
|
|
result = result[:-1]
|
|
result += "\\{:03o}".format(ord(line[i - 1]))
|
|
else:
|
|
result += line[i]
|
|
error_strings[line] = result
|
|
return None
|
|
|
|
|
|
# Replace common words with <0x80 | index>.
|
|
# Index is into a table of words stored as aaaaa<0x80|a>bbb<0x80|b>...
|
|
# Replaced words are assumed to have spaces either side to avoid having to store the spaces in the compressed strings.
|
|
def word_compression(error_strings):
|
|
topn = collections.Counter()
|
|
|
|
for line in error_strings.keys():
|
|
check_non_ascii(line)
|
|
for word in line.split(" "):
|
|
topn[word] += 1
|
|
|
|
# Order not just by frequency, but by expected saving. i.e. prefer a longer string that is used less frequently.
|
|
def bytes_saved(item):
|
|
w, n = item
|
|
return -((len(w) + 1) * (n - 1))
|
|
|
|
top128 = sorted(topn.items(), key=bytes_saved)[:128]
|
|
|
|
index = [w for w, _ in top128]
|
|
index_lookup = {w: i for i, w in enumerate(index)}
|
|
|
|
for line in error_strings.keys():
|
|
result = ""
|
|
need_space = False
|
|
for word in line.split(" "):
|
|
if word in index_lookup:
|
|
result += "\\{:03o}".format(0b10000000 | index_lookup[word])
|
|
need_space = False
|
|
else:
|
|
if need_space:
|
|
result += " "
|
|
need_space = True
|
|
result += word
|
|
error_strings[line] = result.strip()
|
|
|
|
return "".join(w[:-1] + "\\{:03o}".format(0b10000000 | ord(w[-1])) for w in index)
|
|
|
|
|
|
# Replace chars in text with variable length bit sequence.
|
|
# For comparison only (the table is not emitted).
|
|
def huffman_compression(error_strings):
|
|
# https://github.com/tannewt/huffman
|
|
import huffman
|
|
|
|
all_strings = "".join(error_strings)
|
|
cb = huffman.codebook(collections.Counter(all_strings).items())
|
|
|
|
for line in error_strings:
|
|
b = "1"
|
|
for c in line:
|
|
b += cb[c]
|
|
n = len(b)
|
|
if n % 8 != 0:
|
|
n += 8 - (n % 8)
|
|
result = ""
|
|
for i in range(0, n, 8):
|
|
result += "\\{:03o}".format(int(b[i : i + 8], 2))
|
|
if len(result) > len(line) * 4:
|
|
result = line
|
|
error_strings[line] = result
|
|
|
|
# TODO: This would be the prefix lengths and the table ordering.
|
|
return "_" * (10 + len(cb))
|
|
|
|
|
|
# Replace common N-letter sequences with <0x80 | index>, where
|
|
# the common sequences are stored in a separate table.
|
|
# This isn't very useful, need a smarter way to find top-ngrams.
|
|
def ngram_compression(error_strings):
|
|
topn = collections.Counter()
|
|
N = 2
|
|
|
|
for line in error_strings.keys():
|
|
check_non_ascii(line)
|
|
if len(line) < N:
|
|
continue
|
|
for i in range(0, len(line) - N, N):
|
|
topn[line[i : i + N]] += 1
|
|
|
|
def bytes_saved(item):
|
|
w, n = item
|
|
return -(len(w) * (n - 1))
|
|
|
|
top128 = sorted(topn.items(), key=bytes_saved)[:128]
|
|
|
|
index = [w for w, _ in top128]
|
|
index_lookup = {w: i for i, w in enumerate(index)}
|
|
|
|
for line in error_strings.keys():
|
|
result = ""
|
|
for i in range(0, len(line) - N + 1, N):
|
|
word = line[i : i + N]
|
|
if word in index_lookup:
|
|
result += "\\{:03o}".format(0b10000000 | index_lookup[word])
|
|
else:
|
|
result += word
|
|
if len(line) % N != 0:
|
|
result += line[len(line) - len(line) % N :]
|
|
error_strings[line] = result.strip()
|
|
|
|
return "".join(index)
|
|
|
|
|
|
def main(collected_path, fn):
|
|
error_strings = {}
|
|
max_uncompressed_len = 0
|
|
num_uses = 0
|
|
|
|
# Read in all MP_ERROR_TEXT strings.
|
|
with open(collected_path, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
num_uses += 1
|
|
error_strings[line] = None
|
|
max_uncompressed_len = max(max_uncompressed_len, len(line))
|
|
|
|
# So that objexcept.c can figure out how big the buffer needs to be.
|
|
print("#define MP_MAX_UNCOMPRESSED_TEXT_LEN ({})".format(max_uncompressed_len))
|
|
|
|
# Run the compression.
|
|
compressed_data = fn(error_strings)
|
|
|
|
# Print the data table.
|
|
print('MP_COMPRESSED_DATA("{}")'.format(compressed_data))
|
|
|
|
# Print the replacements.
|
|
for uncomp, comp in error_strings.items():
|
|
print('MP_MATCH_COMPRESSED("{}", "\\{:03o}{}")'.format(uncomp, _COMPRESSED_MARKER, comp))
|
|
|
|
# Used to calculate the "true" length of the (escaped) compressed strings.
|
|
def unescape(s):
|
|
return re.sub(r"\\\d\d\d", "!", s)
|
|
|
|
# Stats. Note this doesn't include the cost of the decompressor code.
|
|
uncomp_len = sum(len(s) + 1 for s in error_strings.keys())
|
|
comp_len = sum(1 + len(unescape(s)) + 1 for s in error_strings.values())
|
|
data_len = len(compressed_data) + 1 if compressed_data else 0
|
|
print("// Total input length: {}".format(uncomp_len))
|
|
print("// Total compressed length: {}".format(comp_len))
|
|
print("// Total data length: {}".format(data_len))
|
|
print("// Predicted saving: {}".format(uncomp_len - comp_len - data_len))
|
|
|
|
# Somewhat meaningless comparison to zlib/gzip.
|
|
all_input_bytes = "\\0".join(error_strings.keys()).encode()
|
|
print()
|
|
if hasattr(gzip, "compress"):
|
|
gzip_len = len(gzip.compress(all_input_bytes)) + num_uses * 4
|
|
print("// gzip length: {}".format(gzip_len))
|
|
print("// Percentage of gzip: {:.1f}%".format(100 * (comp_len + data_len) / gzip_len))
|
|
if hasattr(zlib, "compress"):
|
|
zlib_len = len(zlib.compress(all_input_bytes)) + num_uses * 4
|
|
print("// zlib length: {}".format(zlib_len))
|
|
print("// Percentage of zlib: {:.1f}%".format(100 * (comp_len + data_len) / zlib_len))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv[1], word_compression)
|