pebble/tools/pbpack.py

293 lines
11 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import stm32_crc
import struct
import time
class ResourcePackTableEntry(object):
TABLE_ENTRY_FMT = '<IIII'
def __init__(self, content_index, offset, length, crc):
# The index into the contents array that holds our data in the ResourcePack object
self.content_index = content_index
# The offset into the body of the pbpack in bytes that we can find the content
self.offset = offset
# The length of the content
self.length = length
# The CRC of the content
self.crc = crc
def serialize(self, file_id):
fmt = self.TABLE_ENTRY_FMT
return struct.pack(fmt, file_id, self.offset, self.length, self.crc)
@classmethod
def deserialize(cls, table_entry_data):
"""
Return a tuple of file_id, ResourcePackTableEntry object for the given data.
"""
fmt = cls.TABLE_ENTRY_FMT
file_id, offset, length, crc = struct.unpack(fmt, table_entry_data)
entry = ResourcePackTableEntry(-1, offset, length, crc)
return file_id, entry
def __repr__(self):
return str(self.__dict__)
class ResourcePack(object):
""" Pebble resource pack file format (de)serialization tools.
An instance of this class is an in-memory representation of a resource
pack. The class has a number of methods to facilitate (de)serialization
of .pbpack files.
"""
TABLE_ENTRY_SIZE_BYTES = 16
MANIFEST_FMT = '<III'
MANIFEST_SIZE_BYTES = 12
def get_content_crc(self):
all_contents = self.serialize_content()
return stm32_crc.crc32(all_contents)
def serialize_manifest(self, crc=None, timestamp=None):
fmt = self.MANIFEST_FMT
return struct.pack(fmt, len(self.table_entries), self.crc, self.timestamp)
def serialize_table(self):
# Serialize these entries into table_data
cur_file_id = 1
table_data = ''
for cur_file_id, table_entry in enumerate(self.table_entries, start=1):
table_data += table_entry.serialize(cur_file_id)
# Pad the rest of the table_data up to table_size
for i in xrange(cur_file_id, self.table_size):
table_data += ResourcePackTableEntry(0, 0, 0, 0).serialize(0)
return table_data
def serialize_content(self):
"""
Serialize the content in the order dictated by offsets in the table entries
"""
serialized_content_indexes = set()
serialized_content = []
for entry in sorted(self.table_entries, key=lambda e: e.offset):
if entry.content_index in serialized_content_indexes:
continue
serialized_content_indexes.add(entry.content_index)
serialized_content.append(self.contents[entry.content_index])
return b"".join(serialized_content)
@classmethod
def deserialize(cls, f_in, is_system=True):
resource_pack = cls(is_system)
# Parse manifest:
manifest_data = f_in.read(cls.MANIFEST_SIZE_BYTES)
(num_files, crc, timestamp) = struct.unpack(cls.MANIFEST_FMT, manifest_data)
resource_pack.num_files = num_files
resource_pack.crc = crc
resource_pack.timestamp = timestamp
# Parse table entries:
resource_pack.table_entries = []
for n in xrange(num_files):
table_entry = f_in.read(cls.TABLE_ENTRY_SIZE_BYTES)
file_id, entry = ResourcePackTableEntry.deserialize(table_entry)
if file_id == 0:
# No more entries
break
if file_id != n + 1:
raise Exception("File ID is expected to be %u, but was %u" %
(n + 1, file_id))
resource_pack.table_entries.append(entry)
if len(resource_pack.table_entries) != num_files:
raise Exception("Number of files in manifest is %u, but actual"
"number is %u" % (num_files, n))
# Figure out which content_index to assign to each table entry. Find all unique offset and
# length combinations and then assign content indexes appropriately. We need to include the
# length because we allow zero length resources and a zero length resource will have the
# same offset as a non-zero length resource
unique_offsets = set()
for e in resource_pack.table_entries:
unique_offsets.add((e.offset, e.length))
unique_offsets = list(unique_offsets)
for e in resource_pack.table_entries:
e.content_index = unique_offsets.index((e.offset, e.length))
# Fetch the contents, make sure we only load each unique piece of content once
loaded_content_indexes = set()
for entry in sorted(resource_pack.table_entries, key=lambda e: e.content_index):
if entry.content_index in loaded_content_indexes:
# Already loaded, just skip
continue
loaded_content_indexes.add(entry.content_index)
f_in.seek(entry.offset + resource_pack.content_start)
content = f_in.read(entry.length)
calculated_crc = stm32_crc.crc32(content)
if calculated_crc != entry.crc:
raise Exception("Entry %s does not match CRC of content (%u). "
"Hint: try with%s the --app flag"
% (entry, calculated_crc, "" if is_system else "out"))
resource_pack.contents.append(content)
resource_pack.finalized = True
return resource_pack
def finalize(self):
"""
Take all the resources that have been added using add_resource and finalize the pack. Once
a pack is finalized no more resources may be added.
"""
if (len(self.table_entries) > self.table_size):
raise Exception("Exceeded max number of resources. Must have %d or "
"fewer" % self.table_size)
# Assign offsets to each of the entries in reverse order. The reason we do this in reverse
# is so that the resource at the end is guaranteed to be at the end of the pack.
#
# This is required because the firmware looks at the offset + length of the final resource
# in the table in order to determine the total size of the pack. If the final resource
# is a duplicate of an earlier resource and if we assigned offsets starting at the
# beginning of the table, that final resource would end up pointing to an assigned
# offset somewhere in the middle of the pack, causing the pack to appear to be truncated.
current_offset = sum((len(c) for c in self.contents))
for table_entry in reversed(self.table_entries):
if table_entry.offset == -1:
# This entry doesn't have an offset in the output file yet, assign one to all
# entries that share the same content index
current_offset -= table_entry.length
for e in self.table_entries:
if e.content_index == table_entry.content_index:
e.offset = current_offset
self.crc = self.get_content_crc()
self.finalized = True
def serialize(self, f_out):
if not self.finalized:
self.finalize()
f_out.write(self.serialize_manifest(self.crc))
f_out.write(self.serialize_table())
for c in self.serialize_content():
f_out.write(c)
return self.crc
def add_resource(self, content):
if self.finalized:
raise Exception("Cannot add additional resource, " +
"resource pack has already been finalized")
# If resource already is present, add to table only
try:
# Try to find the index for this content if it already exists. If it doesn't we'll
# throw the ValueError.
content_index = self.contents.index(content)
except ValueError:
# This content is completely new, add it to the contents list.
self.contents.append(content)
content_index = len(self.contents) - 1
crc = stm32_crc.crc32(content)
# Use -1 as the offset as we don't assign offsets until serialize_table
self.table_entries.append(ResourcePackTableEntry(content_index, -1, len(content), crc))
def dump(self):
"""
Dump a bunch of information about this pbpack to stdout
"""
print 'Manifest CRC: 0x%x' % self.crc
print 'Calculated CRC: 0x%x' % self.get_content_crc()
print 'Num Items: %u' % len(self.table_entries)
for i, entry in enumerate(self.table_entries, start=1):
print ' %u: Offset %u Length %u CRC 0x%x' % (i, entry.offset, entry.length, entry.crc)
def __init__(self, is_system):
self.table_size = 512 if is_system else 256
self.content_start = self.MANIFEST_SIZE_BYTES + self.table_size * self.TABLE_ENTRY_SIZE_BYTES
# Note that we never actually set the timestamp in newly generated pbpacks. The timestamp
# field in the manifest is unused in practice and we'll probably reuse that field for
# another purpose in the future.
self.timestamp = 0
# CRC of the content. Note that it only covers the content itself and not the table nor
# the manifest. Calculated during finalize().
self.crc = 0
# List of binary content, where each entry is the raw processed data for each unique
# resource.
self.contents = []
# List of resources that are in the pack. Note that this list may be longer than the
# self.contents list if there are duplicates, duplicated entries (exact same data) will
# not be repeated in self.contents. Each entry is a ResourcePackTableEntry
self.table_entries = []
# Indicates that the ResourcePack has been built and no more resources can be added.
self.finalized = False
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='dump pbpack metadata')
parser.add_argument('pbpack_path', help='path to pbpack to dump')
parser.add_argument('--app', default=False, action='store_true',
help='Indicate this pbpack is an app pbpack')
args = parser.parse_args()
with open(args.pbpack_path, 'rb') as f:
pack = ResourcePack.deserialize(f, is_system=not args.app)
pack.dump()