mirror of
https://github.com/google/pebble.git
synced 2025-03-15 00:31:21 +00:00
184 lines
7.2 KiB
Python
Executable file
184 lines
7.2 KiB
Python
Executable file
#!/usr/bin/env python
|
|
# Copyright 2024 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
|
|
"""
|
|
Sparse Length Encoding
|
|
|
|
A variant of run-length encoding which is tuned specifically to encode binary
|
|
data with long runs of zeroes interspersed with random (poorly-compressible)
|
|
data.
|
|
|
|
The format is fairly simple. The encoded data is a stream of octets (bytes)
|
|
beginning with a one-octet header. This header octet is the 'escape byte' that
|
|
indicates to the decoder that it and the following octets should be treated
|
|
specially. The encoder selects this escape byte to be an octet which occurs
|
|
least frequently (or not at all) in the decoded data.
|
|
|
|
The following octets of the encoded data are emitted literally until an escape
|
|
byte is encountered. The escape byte marks the start of an 'escape sequence'
|
|
comprised of the escape byte itself and one or two following bytes.
|
|
|
|
- The escape byte followed by 0x00 indicates the end of input.
|
|
- The escape byte followed by 0x01 means 'emit a literal escape byte'
|
|
- The escape byte followed by a byte "b" between 0x02 and 0x7f inclusive means
|
|
'emit b zeroes'. This two-byte sequence can encode a run of length 2-127.
|
|
- The escape byte followed by a byte "b" equal to or greater than 0x80
|
|
(i.e. with the MSB set) means 'take the next byte "c" and emit
|
|
((b & 0x7f) << 8 | c)+0x80 zeroes'. This three-byte sequence can encode a run
|
|
of length 128-32895.
|
|
|
|
The minimum overhead for this encoding scheme is three bytes: header and
|
|
end-of-input escape sequence.
|
|
"""
|
|
|
|
from collections import Counter
|
|
from itertools import groupby
|
|
|
|
_MAX_COUNT = 0x807F # max is ((0x7F << 8) | (0xFF) + 0x80
|
|
|
|
|
|
def encode(source):
|
|
# Analyze the source data to select the escape byte. To keep things simple, we don't allow 0 to
|
|
# be the escape character.
|
|
source = bytes(source)
|
|
frequency = Counter({chr(n): 0 for n in range(1, 256)})
|
|
frequency.update(source)
|
|
# most_common() doesn't define what happens if there's a tie in frequency. Let's always pick
|
|
# the lowest value of that frequency to make the encoding predictable.
|
|
occurences = frequency.most_common()
|
|
escape = min(x[0] for x in occurences if x[1] == occurences[-1][1])
|
|
yield escape
|
|
for b, g in groupby(source):
|
|
if b == b'\0':
|
|
# this is a run of zeros
|
|
count = len(list(g))
|
|
while count >= 0x80:
|
|
# encode the number of zeros using two bytes
|
|
unit = min(count, _MAX_COUNT)
|
|
count -= unit
|
|
unit -= 0x80
|
|
yield escape
|
|
yield chr(((unit >> 8) & 0x7F) | 0x80)
|
|
yield chr(unit & 0xFF)
|
|
if count == 1:
|
|
# can't encode a length of 1 zero, so just emit it directly
|
|
yield b
|
|
elif 1 < count < 0x80:
|
|
# encode the number of zeros using one byte
|
|
yield escape
|
|
yield chr(count)
|
|
elif count < 0:
|
|
raise Exception('Encoding malfunctioned')
|
|
else:
|
|
# simply insert the characters (and escape the escape character)
|
|
for _ in g:
|
|
yield b
|
|
if b == escape:
|
|
yield b'\1'
|
|
yield escape
|
|
yield b'\0'
|
|
|
|
|
|
def decode(stream):
|
|
stream = iter(stream)
|
|
escape = next(stream)
|
|
while True:
|
|
char = next(stream)
|
|
if char == escape:
|
|
code = next(stream)
|
|
if code == b'\0':
|
|
return
|
|
elif code == b'\1':
|
|
yield escape
|
|
else:
|
|
if ord(code) & 0x80 == 0:
|
|
count = ord(code)
|
|
else:
|
|
count = (((ord(code) & 0x7f) << 8) | ord(next(stream))) + 0x80
|
|
assert(count <= _MAX_COUNT)
|
|
for _ in xrange(count):
|
|
yield b'\0'
|
|
else:
|
|
yield char
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
if len(sys.argv) == 1:
|
|
# run unit tests
|
|
import unittest
|
|
|
|
class TestSparseLengthEncoding(unittest.TestCase):
|
|
def test_empty(self):
|
|
raw_data = ''
|
|
encoded_data = ''.join(encode(raw_data))
|
|
decoded_data = ''.join(decode(encoded_data))
|
|
self.assertEquals(encoded_data, '\x01\x01\x00')
|
|
|
|
def test_no_zeros(self):
|
|
raw_data = '\x02\xff\xef\x99'
|
|
encoded_data = ''.join(encode(raw_data))
|
|
decoded_data = ''.join(decode(encoded_data))
|
|
self.assertEquals(encoded_data, '\x01\x02\xff\xef\x99\x01\x00')
|
|
|
|
def test_one_zero(self):
|
|
raw_data = '\x00'
|
|
encoded_data = ''.join(encode(raw_data))
|
|
decoded_data = ''.join(decode(encoded_data))
|
|
self.assertEquals(encoded_data, '\x01\x00\x01\x00')
|
|
|
|
def test_small_number_of_zeros(self):
|
|
# under 0x80 zeros
|
|
raw_data = '\0' * 0x0040
|
|
encoded_data = ''.join(encode(raw_data))
|
|
decoded_data = ''.join(decode(encoded_data))
|
|
self.assertEquals(encoded_data, '\x01\x01\x40\x01\x00')
|
|
self.assertEquals(decoded_data, raw_data)
|
|
|
|
def test_medium_number_of_zeros(self):
|
|
# between 0x80 and 0x807f zeros
|
|
raw_data = '\0' * 0x1800
|
|
encoded_data = ''.join(encode(raw_data))
|
|
decoded_data = ''.join(decode(encoded_data))
|
|
self.assertEquals(encoded_data, '\x01\x01\x97\x80\x01\x00')
|
|
self.assertEquals(decoded_data, raw_data)
|
|
|
|
def test_remainder_one(self):
|
|
# leaves a remainder of 1 zero
|
|
raw_data = '\0' * (0x807f + 1)
|
|
encoded_data = ''.join(encode(raw_data))
|
|
decoded_data = ''.join(decode(encoded_data))
|
|
self.assertEquals(encoded_data, '\x01\x01\xff\xff\x00\x01\x00')
|
|
self.assertEquals(decoded_data, raw_data)
|
|
|
|
def test_remainder_under_128(self):
|
|
# leaves a remainder of 100 zeros
|
|
raw_data = '\0' * (0x807f + 100)
|
|
encoded_data = ''.join(encode(raw_data))
|
|
decoded_data = ''.join(decode(encoded_data))
|
|
self.assertEquals(encoded_data, '\x01\x01\xff\xff\x01\x64\x01\x00')
|
|
self.assertEquals(decoded_data, raw_data)
|
|
|
|
unittest.main()
|
|
elif len(sys.argv) == 2:
|
|
# encode the specified file
|
|
data = open(sys.argv[1], 'rb').read()
|
|
encoded = ''.join(encode(data))
|
|
if ''.join(decode(encoded)) != data:
|
|
raise Exception('Invalid encoding')
|
|
sys.stdout.write(''.join(encode(f)))
|
|
else:
|
|
raise Exception('Invalid arguments')
|