pebble/waftools/sparse_length_encoding.py
2025-01-27 11:38:16 -08:00

184 lines
7.2 KiB
Python
Executable file

#!/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Sparse Length Encoding
A variant of run-length encoding which is tuned specifically to encode binary
data with long runs of zeroes interspersed with random (poorly-compressible)
data.
The format is fairly simple. The encoded data is a stream of octets (bytes)
beginning with a one-octet header. This header octet is the 'escape byte' that
indicates to the decoder that it and the following octets should be treated
specially. The encoder selects this escape byte to be an octet which occurs
least frequently (or not at all) in the decoded data.
The following octets of the encoded data are emitted literally until an escape
byte is encountered. The escape byte marks the start of an 'escape sequence'
comprised of the escape byte itself and one or two following bytes.
- The escape byte followed by 0x00 indicates the end of input.
- The escape byte followed by 0x01 means 'emit a literal escape byte'
- The escape byte followed by a byte "b" between 0x02 and 0x7f inclusive means
'emit b zeroes'. This two-byte sequence can encode a run of length 2-127.
- The escape byte followed by a byte "b" equal to or greater than 0x80
(i.e. with the MSB set) means 'take the next byte "c" and emit
((b & 0x7f) << 8 | c)+0x80 zeroes'. This three-byte sequence can encode a run
of length 128-32895.
The minimum overhead for this encoding scheme is three bytes: header and
end-of-input escape sequence.
"""
from collections import Counter
from itertools import groupby
_MAX_COUNT = 0x807F # max is ((0x7F << 8) | (0xFF) + 0x80
def encode(source):
# Analyze the source data to select the escape byte. To keep things simple, we don't allow 0 to
# be the escape character.
source = bytes(source)
frequency = Counter({chr(n): 0 for n in range(1, 256)})
frequency.update(source)
# most_common() doesn't define what happens if there's a tie in frequency. Let's always pick
# the lowest value of that frequency to make the encoding predictable.
occurences = frequency.most_common()
escape = min(x[0] for x in occurences if x[1] == occurences[-1][1])
yield escape
for b, g in groupby(source):
if b == b'\0':
# this is a run of zeros
count = len(list(g))
while count >= 0x80:
# encode the number of zeros using two bytes
unit = min(count, _MAX_COUNT)
count -= unit
unit -= 0x80
yield escape
yield chr(((unit >> 8) & 0x7F) | 0x80)
yield chr(unit & 0xFF)
if count == 1:
# can't encode a length of 1 zero, so just emit it directly
yield b
elif 1 < count < 0x80:
# encode the number of zeros using one byte
yield escape
yield chr(count)
elif count < 0:
raise Exception('Encoding malfunctioned')
else:
# simply insert the characters (and escape the escape character)
for _ in g:
yield b
if b == escape:
yield b'\1'
yield escape
yield b'\0'
def decode(stream):
stream = iter(stream)
escape = next(stream)
while True:
char = next(stream)
if char == escape:
code = next(stream)
if code == b'\0':
return
elif code == b'\1':
yield escape
else:
if ord(code) & 0x80 == 0:
count = ord(code)
else:
count = (((ord(code) & 0x7f) << 8) | ord(next(stream))) + 0x80
assert(count <= _MAX_COUNT)
for _ in xrange(count):
yield b'\0'
else:
yield char
if __name__ == '__main__':
import sys
if len(sys.argv) == 1:
# run unit tests
import unittest
class TestSparseLengthEncoding(unittest.TestCase):
def test_empty(self):
raw_data = ''
encoded_data = ''.join(encode(raw_data))
decoded_data = ''.join(decode(encoded_data))
self.assertEquals(encoded_data, '\x01\x01\x00')
def test_no_zeros(self):
raw_data = '\x02\xff\xef\x99'
encoded_data = ''.join(encode(raw_data))
decoded_data = ''.join(decode(encoded_data))
self.assertEquals(encoded_data, '\x01\x02\xff\xef\x99\x01\x00')
def test_one_zero(self):
raw_data = '\x00'
encoded_data = ''.join(encode(raw_data))
decoded_data = ''.join(decode(encoded_data))
self.assertEquals(encoded_data, '\x01\x00\x01\x00')
def test_small_number_of_zeros(self):
# under 0x80 zeros
raw_data = '\0' * 0x0040
encoded_data = ''.join(encode(raw_data))
decoded_data = ''.join(decode(encoded_data))
self.assertEquals(encoded_data, '\x01\x01\x40\x01\x00')
self.assertEquals(decoded_data, raw_data)
def test_medium_number_of_zeros(self):
# between 0x80 and 0x807f zeros
raw_data = '\0' * 0x1800
encoded_data = ''.join(encode(raw_data))
decoded_data = ''.join(decode(encoded_data))
self.assertEquals(encoded_data, '\x01\x01\x97\x80\x01\x00')
self.assertEquals(decoded_data, raw_data)
def test_remainder_one(self):
# leaves a remainder of 1 zero
raw_data = '\0' * (0x807f + 1)
encoded_data = ''.join(encode(raw_data))
decoded_data = ''.join(decode(encoded_data))
self.assertEquals(encoded_data, '\x01\x01\xff\xff\x00\x01\x00')
self.assertEquals(decoded_data, raw_data)
def test_remainder_under_128(self):
# leaves a remainder of 100 zeros
raw_data = '\0' * (0x807f + 100)
encoded_data = ''.join(encode(raw_data))
decoded_data = ''.join(decode(encoded_data))
self.assertEquals(encoded_data, '\x01\x01\xff\xff\x01\x64\x01\x00')
self.assertEquals(decoded_data, raw_data)
unittest.main()
elif len(sys.argv) == 2:
# encode the specified file
data = open(sys.argv[1], 'rb').read()
encoded = ''.join(encode(data))
if ''.join(decode(encoded)) != data:
raise Exception('Invalid encoding')
sys.stdout.write(''.join(encode(f)))
else:
raise Exception('Invalid arguments')