155 lines
4.0 KiB
Python
155 lines
4.0 KiB
Python
try:
|
|
# Check if deflate is available.
|
|
import deflate
|
|
import io
|
|
except ImportError:
|
|
print("SKIP")
|
|
raise SystemExit
|
|
|
|
# Check if compression is enabled.
|
|
if not hasattr(deflate.DeflateIO, "write"):
|
|
print("SKIP")
|
|
raise SystemExit
|
|
|
|
# Simple compression & decompression.
|
|
b = io.BytesIO()
|
|
g = deflate.DeflateIO(b, deflate.RAW)
|
|
data = b"micropython"
|
|
N = 10
|
|
for i in range(N):
|
|
g.write(data)
|
|
g.close()
|
|
result_raw = b.getvalue()
|
|
print(len(result_raw) < len(data) * N)
|
|
b = io.BytesIO(result_raw)
|
|
g = deflate.DeflateIO(b, deflate.RAW)
|
|
print(g.read())
|
|
|
|
# Same, but using a context manager.
|
|
b = io.BytesIO()
|
|
with deflate.DeflateIO(b, deflate.RAW) as g:
|
|
for i in range(N):
|
|
g.write(data)
|
|
result_raw = b.getvalue()
|
|
print(len(result_raw) < len(data) * N)
|
|
b = io.BytesIO(result_raw)
|
|
with deflate.DeflateIO(b, deflate.RAW) as g:
|
|
print(g.read())
|
|
|
|
# Writing to a closed underlying stream.
|
|
b = io.BytesIO()
|
|
g = deflate.DeflateIO(b, deflate.RAW)
|
|
g.write(b"micropython")
|
|
b.close()
|
|
try:
|
|
g.write(b"micropython")
|
|
except ValueError:
|
|
print("ValueError")
|
|
|
|
# Writing to a closed DeflateIO.
|
|
b = io.BytesIO()
|
|
g = deflate.DeflateIO(b, deflate.RAW)
|
|
g.write(b"micropython")
|
|
g.close()
|
|
try:
|
|
g.write(b"micropython")
|
|
except OSError:
|
|
print("OSError")
|
|
|
|
|
|
def decompress(data, *args):
|
|
buf = io.BytesIO(data)
|
|
with deflate.DeflateIO(buf, *args) as g:
|
|
return g.read()
|
|
|
|
|
|
def compress(data, *args):
|
|
b = io.BytesIO()
|
|
with deflate.DeflateIO(b, *args) as g:
|
|
g.write(data)
|
|
return b.getvalue()
|
|
|
|
|
|
def compress_error(data, *args):
|
|
try:
|
|
compress(data, *args)
|
|
except OSError:
|
|
print("OSError")
|
|
except ValueError:
|
|
print("ValueError")
|
|
|
|
|
|
# More test patterns.
|
|
PATTERNS_RAW = (
|
|
(b"0", b"3\x00\x00"),
|
|
(b"a", b"K\x04\x00"),
|
|
(b"0" * 100, b"3\xa0\x03\x00\x00"),
|
|
(
|
|
bytes(range(64)),
|
|
b"c`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00",
|
|
),
|
|
)
|
|
for unpacked, packed in PATTERNS_RAW:
|
|
print(compress(unpacked) == packed)
|
|
print(compress(unpacked, deflate.RAW) == packed)
|
|
|
|
# Verify header and checksum format.
|
|
unpacked = b"hello"
|
|
packed = b"\xcbH\xcd\xc9\xc9\x07\x00"
|
|
|
|
|
|
def check_header(n, a, b):
|
|
if a == b:
|
|
print(n)
|
|
else:
|
|
print(n, a, b)
|
|
|
|
|
|
check_header("RAW", compress(unpacked, deflate.RAW), packed)
|
|
check_header(
|
|
"ZLIB(9)", compress(unpacked, deflate.ZLIB, 9), b"\x18\x95" + packed + b"\x06,\x02\x15"
|
|
)
|
|
check_header(
|
|
"ZLIB(15)", compress(unpacked, deflate.ZLIB, 15), b"\x78\x9c" + packed + b"\x06,\x02\x15"
|
|
)
|
|
check_header(
|
|
"GZIP",
|
|
compress(unpacked, deflate.GZIP, 9),
|
|
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03" + packed + b"\x86\xa6\x106\x05\x00\x00\x00",
|
|
)
|
|
|
|
# Valid wbits values.
|
|
compress_error(unpacked, deflate.RAW, -1)
|
|
print(len(compress(unpacked, deflate.RAW, 0)))
|
|
compress_error(unpacked, deflate.RAW, 1)
|
|
compress_error(unpacked, deflate.RAW, 4)
|
|
for i in range(5, 16):
|
|
print(len(compress(unpacked, deflate.RAW, i)))
|
|
compress_error(unpacked, deflate.RAW, 16)
|
|
|
|
# Invalid values for format.
|
|
compress_error(unpacked, -1)
|
|
compress_error(unpacked, 5)
|
|
|
|
# Fill buf with a predictable pseudorandom sequence.
|
|
buf = bytearray(1024)
|
|
lfsr = 1 << 15 | 1
|
|
for i in range(len(buf)):
|
|
bit = (lfsr ^ (lfsr >> 1) ^ (lfsr >> 3) ^ (lfsr >> 12)) & 1
|
|
lfsr = (lfsr >> 1) | (bit << 15)
|
|
buf[i] = lfsr & 0xFF
|
|
|
|
# Verify that compression improves as the window size increases.
|
|
prev_len = len(buf)
|
|
for wbits in range(5, 10):
|
|
result = compress(buf, deflate.RAW, wbits)
|
|
next_len = len(result)
|
|
print(next_len < prev_len and decompress(result, deflate.RAW, wbits) == buf)
|
|
prev_len = next_len
|
|
|
|
# Verify that compression is optimal: in the bytes below, the final "123" should be
|
|
# compressed by referencing the "123" just before it, and not the one all the way back
|
|
# at the start of the bytes.
|
|
compressed = compress(b"1234567890abcdefghijklmnopqrstuvwxyz123123", deflate.RAW)
|
|
print(len(compressed), compressed)
|