mirror of
https://github.com/google/pebble.git
synced 2025-03-15 08:41:21 +00:00
522 lines
17 KiB
Python
522 lines
17 KiB
Python
|
# Copyright 2024 Google LLC
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
# you may not use this file except in compliance with the License.
|
||
|
# You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
|
try:
|
||
|
import gdb
|
||
|
except ImportError:
|
||
|
raise Exception('This file is a GDB module.\n'
|
||
|
'It is not intended to be run outside of GDB.\n'
|
||
|
'Hint: to load a script in GDB, use `source this_file.py`')
|
||
|
|
||
|
|
||
|
import logging
|
||
|
import string
|
||
|
import types
|
||
|
import datetime
|
||
|
|
||
|
from collections import namedtuple, defaultdict, OrderedDict
|
||
|
from gdb_tintin import FreeRTOSMutex, Tasks, LinkedList
|
||
|
from gdb_symbols import get_static_variable, get_static_function
|
||
|
from gdb_tintin_metadata import TintinMetadata
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
recognizers = {}
|
||
|
|
||
|
_Recognizer = namedtuple('Recognizer', 'impl depends_on')
|
||
|
|
||
|
|
||
|
def register_recognizer(name, fn, dependency=None):
|
||
|
""" Registers a recognizer.
|
||
|
|
||
|
Recognizers are run against each block in the heap.
|
||
|
They consist of a name, a dependency, and an implementation.
|
||
|
The implementation consumes a block, a heap object, and a results dictionary and returns
|
||
|
either a casted block or None.
|
||
|
|
||
|
See Recognizer for auto-registering declarative classes.
|
||
|
"""
|
||
|
recognizers[name] = _Recognizer(fn, dependency)
|
||
|
|
||
|
|
||
|
def parse_heap(heap, recognizer_subset=None):
|
||
|
results = {
|
||
|
'Free': heap.free_blocks()
|
||
|
}
|
||
|
|
||
|
heap_recognizers = recognizers
|
||
|
if recognizer_subset:
|
||
|
heap_recognizers = {name: recognizers[name] for name in recognizer_subset}
|
||
|
|
||
|
ordered_recognizers, hidden = _order_recognizers(heap_recognizers)
|
||
|
logger.info('Running: {}'.format(', '.join(ordered_recognizers.keys())))
|
||
|
for name, recognizer in ordered_recognizers.iteritems():
|
||
|
try:
|
||
|
results[name] = filter(None, (recognizer.impl(block, heap, results) for
|
||
|
block in heap.allocated_blocks()))
|
||
|
except:
|
||
|
print name + " hit an exception. Skipping"
|
||
|
|
||
|
for dependency in hidden:
|
||
|
del results[dependency]
|
||
|
|
||
|
return results
|
||
|
|
||
|
|
||
|
def _order_recognizers(recognizer_subset):
|
||
|
recognizer_subset = recognizer_subset.copy()
|
||
|
ordered = OrderedDict()
|
||
|
hidden = []
|
||
|
last_length = -1
|
||
|
|
||
|
# Run until we stop adding recognizers to our ordered list
|
||
|
while recognizer_subset and len(ordered) != last_length:
|
||
|
last_length = len(ordered)
|
||
|
|
||
|
for name, recognizer in recognizer_subset.items():
|
||
|
if not recognizer.depends_on or recognizer.depends_on in ordered:
|
||
|
ordered[name] = recognizer
|
||
|
del recognizer_subset[name]
|
||
|
|
||
|
# Add implicit dependencies
|
||
|
for name, recognizer in recognizer_subset.iteritems():
|
||
|
if recognizer.depends_on not in ordered:
|
||
|
logger.info('Adding dependency: {}'.format(recognizer.depends_on))
|
||
|
ordered[recognizer.depends_on] = recognizers[recognizer.depends_on]
|
||
|
hidden.append(recognizer.depends_on)
|
||
|
ordered[name] = recognizer
|
||
|
|
||
|
return [ordered, hidden]
|
||
|
|
||
|
|
||
|
class RecognizerType(type):
|
||
|
def __new__(cls, name, bases, dct):
|
||
|
for key in dct:
|
||
|
# Convert instance methods into class methods
|
||
|
if isinstance(dct[key], types.FunctionType):
|
||
|
dct[key] = classmethod(dct[key])
|
||
|
|
||
|
if dct.get('depends_on') == 'Heap':
|
||
|
dct['depends_on'] = None
|
||
|
dct['uses_heap'] = True
|
||
|
else:
|
||
|
dct['uses_heap'] = False
|
||
|
|
||
|
return type.__new__(cls, name, bases, dct)
|
||
|
|
||
|
def __init__(cls, name, bases, dct):
|
||
|
if name != 'Recognizer':
|
||
|
register_recognizer(name, cls, dct.get('depends_on'))
|
||
|
|
||
|
super(RecognizerType, cls).__init__(name, bases, dct)
|
||
|
|
||
|
def __call__(cls, block, heap, results):
|
||
|
""" Returns either a casted block or None.
|
||
|
|
||
|
This allows the class object itself to implement the recognizer protocol.
|
||
|
This means we can register the class as a recognizer without creating an instance of it.
|
||
|
"""
|
||
|
try:
|
||
|
right_size = (block.size in heap.object_size(cls.type)) if cls.type else True
|
||
|
except gdb.error:
|
||
|
# Type doesn't exist
|
||
|
return None
|
||
|
|
||
|
if cls.uses_heap:
|
||
|
search_blocks = [r_block.data for r_block in heap]
|
||
|
else:
|
||
|
search_blocks = [r_block.data for r_block in results.get(cls.depends_on) or []]
|
||
|
|
||
|
if right_size and cls.is_type(block.cast(cls.type), search_blocks):
|
||
|
return block.cast(cls.type, clone=True)
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
def __str__(cls):
|
||
|
return cls.__name__
|
||
|
|
||
|
|
||
|
class Recognizer(object):
|
||
|
""" This is a declarative recognizer. It auto-registers with the recognizer dictionary.
|
||
|
|
||
|
Note that declarative recognizers are singletons that don't get instantiated, so
|
||
|
__init__ is never called and any modifications to class members are persistent.
|
||
|
This behavior holds for subclasses.
|
||
|
|
||
|
Example:
|
||
|
>>> # A Recognizer's class name is its friendly name.
|
||
|
... class Test(Recognizer):
|
||
|
...
|
||
|
... # Its (optional) type is the C struct it's searching for.
|
||
|
... type = 'TestStruct'
|
||
|
...
|
||
|
... # Its (optional) dependency is the friendly name of the type
|
||
|
... # whose results it depends on. Those results are passed to
|
||
|
... # is_type as search_blocks.
|
||
|
... depends_on = 'Heap'
|
||
|
...
|
||
|
... # is_type() is the comparison function. If type is not NULL,
|
||
|
... # each block will be casted to 'type'. Note that self actually
|
||
|
... # refers to the class.
|
||
|
... def is_type(self, block, search_blocks):
|
||
|
... return block['foo'] == 4
|
||
|
"""
|
||
|
__metaclass__ = RecognizerType
|
||
|
|
||
|
type = None
|
||
|
depends_on = None
|
||
|
|
||
|
def is_type(self, block, search_blocks):
|
||
|
return True
|
||
|
|
||
|
|
||
|
# --- Recognizers --- #
|
||
|
|
||
|
|
||
|
# --- Mutexes --- #
|
||
|
class PebbleMutex(Recognizer):
|
||
|
type = 'PebbleMutexCommon'
|
||
|
|
||
|
def is_type(self, pebble_mutex, search_blocks):
|
||
|
mutex = FreeRTOSMutex(pebble_mutex['freertos_mutex'])
|
||
|
return ((not mutex.locked()) == (pebble_mutex['lr'] == 0) and
|
||
|
(0 <= mutex.num_waiters() <= 10))
|
||
|
|
||
|
|
||
|
class LightMutex(Recognizer):
|
||
|
type = 'LightMutex_t'
|
||
|
depends_on = 'PebbleMutex'
|
||
|
|
||
|
def is_type(self, mutex, search_blocks):
|
||
|
return mutex in [block['freertos_mutex'] for block in search_blocks]
|
||
|
|
||
|
|
||
|
# --- Queues --- #
|
||
|
class Queue(Recognizer):
|
||
|
depends_on = 'Heap'
|
||
|
|
||
|
def is_type(self, data, search_blocks):
|
||
|
queue_type = gdb.lookup_type('Queue_t')
|
||
|
queue = data.cast(queue_type.pointer())
|
||
|
queue_size = int(queue_type.sizeof)
|
||
|
|
||
|
storage_size = queue['uxLength'] * queue['uxItemSize']
|
||
|
correct_head = (queue['pcHead'] >= data)
|
||
|
correct_tail = (queue['pcTail'] == queue['pcHead'] + storage_size)
|
||
|
return (queue['uxLength'] > 0 and queue['pcWriteTo'] != 0 and correct_head and correct_tail)
|
||
|
|
||
|
|
||
|
class Semaphore(Recognizer):
|
||
|
type = 'Queue_t'
|
||
|
depends_on = 'Queue'
|
||
|
|
||
|
def is_type(self, semaphore, search_blocks):
|
||
|
return semaphore['uxItemSize'] == 0 and semaphore in search_blocks
|
||
|
|
||
|
|
||
|
# --- Data Logging --- #
|
||
|
class DataLoggingSession(Recognizer):
|
||
|
type = 'DataLoggingSession'
|
||
|
|
||
|
def is_type(self, dls, search_blocks):
|
||
|
total_tasks = Tasks().total
|
||
|
return 0 <= dls['task'] < total_tasks
|
||
|
|
||
|
|
||
|
class ActiveDLS(Recognizer):
|
||
|
type = 'DataLoggingActiveState'
|
||
|
depends_on = 'DataLoggingSession'
|
||
|
|
||
|
def is_type(self, active_dls, search_blocks):
|
||
|
return active_dls in [block['data'] for block in search_blocks]
|
||
|
|
||
|
|
||
|
class DLSBuffer(Recognizer):
|
||
|
depends_on = 'ActiveDLS'
|
||
|
|
||
|
def is_type(self, dls_buffer, search_blocks):
|
||
|
return dls_buffer in [block['buffer_storage'] for block in search_blocks]
|
||
|
|
||
|
|
||
|
# --- Tasks --- #
|
||
|
class Task(Recognizer):
|
||
|
type = 'TCB_t'
|
||
|
|
||
|
def is_type(self, task, search_blocks):
|
||
|
return is_string(task['pcTaskName'])
|
||
|
|
||
|
|
||
|
class TaskStack(Recognizer):
|
||
|
depends_on = 'Task'
|
||
|
|
||
|
def is_type(self, stack, search_blocks):
|
||
|
return stack in [block['pxStack'] for block in search_blocks]
|
||
|
|
||
|
|
||
|
# --- String-related --- #
|
||
|
def is_string(data):
|
||
|
try:
|
||
|
data = data.string().decode('utf-8')
|
||
|
return len(data) > 2 and all(ord(codepoint) > 127 or
|
||
|
codepoint in string.printable for codepoint in data)
|
||
|
except UnicodeDecodeError:
|
||
|
return False
|
||
|
except UnicodeEncodeError:
|
||
|
return False
|
||
|
|
||
|
|
||
|
class String(Recognizer):
|
||
|
|
||
|
def is_type(self, string, search_blocks):
|
||
|
return is_string(string.cast(gdb.lookup_type('char').pointer()))
|
||
|
|
||
|
|
||
|
class PFSFileNode(Recognizer):
|
||
|
type = 'PFSFileChangedCallbackNode'
|
||
|
depends_on = 'String'
|
||
|
|
||
|
def is_type(self, filenode, search_blocks):
|
||
|
return filenode['name'] in search_blocks
|
||
|
|
||
|
|
||
|
class SettingsFile(Recognizer):
|
||
|
type = 'SettingsFile'
|
||
|
depends_on = 'String'
|
||
|
|
||
|
def is_type(self, settings, search_blocks):
|
||
|
try:
|
||
|
timestamp = int(settings['last_modified'])
|
||
|
date = datetime.datetime.fromtimestamp(timestamp)
|
||
|
except ValueError:
|
||
|
return False
|
||
|
|
||
|
# If a user closes a settings file but forgets to free the
|
||
|
# underlying memory the name will not be in search_blocks
|
||
|
return (settings['max_used_space'] <= settings['max_space_total'] and
|
||
|
settings['used_space'] <= settings['max_used_space'] and
|
||
|
date.year > 2010 and date.year < 2038)
|
||
|
|
||
|
|
||
|
# --- Analytics --- #
|
||
|
class AnalyticsStopwatch(Recognizer):
|
||
|
type = 'AnalyticsStopwatchNode'
|
||
|
|
||
|
def is_type(self, stopwatch, search_blocks):
|
||
|
s_stopwatch_list = get_static_variable('s_stopwatch_list')
|
||
|
stopwatch_list = LinkedList(gdb.parse_and_eval(s_stopwatch_list).dereference())
|
||
|
return stopwatch in stopwatch_list
|
||
|
|
||
|
|
||
|
class AnalyticsHeartbeatList(Recognizer):
|
||
|
type = 'AnalyticsHeartbeatList'
|
||
|
|
||
|
def is_type(self, listnode, search_blocks):
|
||
|
s_app_heartbeat_list = get_static_variable('s_app_heartbeat_list')
|
||
|
heartbeat_list = LinkedList(gdb.parse_and_eval(s_app_heartbeat_list)['node'])
|
||
|
return listnode in heartbeat_list
|
||
|
|
||
|
|
||
|
class AnalyticsHeartbeat(Recognizer):
|
||
|
depends_on = 'AnalyticsHeartbeatList'
|
||
|
|
||
|
def is_type(self, heartbeat, search_blocks):
|
||
|
s_device_heartbeat = get_static_variable('s_device_heartbeat')
|
||
|
device_heartbeat = gdb.parse_and_eval(s_device_heartbeat)
|
||
|
return (heartbeat == device_heartbeat or
|
||
|
heartbeat in [block['heartbeat'] for block in search_blocks])
|
||
|
|
||
|
|
||
|
# --- Timers --- #
|
||
|
class TaskTimer(Recognizer):
|
||
|
type = 'TaskTimer'
|
||
|
|
||
|
def is_type(self, timer, search_blocks):
|
||
|
timer_manager_ref = get_static_variable('s_task_timer_manager')
|
||
|
timer_manager = gdb.parse_and_eval(timer_manager_ref)
|
||
|
max_timer_id = int(timer_manager["next_id"]) - 1
|
||
|
max_timer_id = int() - 1
|
||
|
return 1 <= timer['id'] <= max_timer_id
|
||
|
|
||
|
|
||
|
class EventedTimer(Recognizer):
|
||
|
type = 'EventedTimer'
|
||
|
|
||
|
def is_type(self, timer, search_blocks):
|
||
|
timer_manager_ref = get_static_variable('s_task_timer_manager')
|
||
|
timer_manager = gdb.parse_and_eval(timer_manager_ref)
|
||
|
max_timer_id = int(timer_manager["next_id"]) - 1
|
||
|
total_tasks = Tasks().total
|
||
|
|
||
|
return (1 <= timer['sys_timer_id'] <= max_timer_id
|
||
|
and 0 <= timer['target_task'] < total_tasks)
|
||
|
|
||
|
|
||
|
# --- Communication --- #
|
||
|
class CommSession(Recognizer):
|
||
|
type = 'CommSession'
|
||
|
depends_on = 'Heap'
|
||
|
|
||
|
def try_session_address(self, var_name):
|
||
|
try:
|
||
|
var_ref = get_static_variable(var_name)
|
||
|
return gdb.parse_and_eval(var_ref).address
|
||
|
except:
|
||
|
return None
|
||
|
|
||
|
|
||
|
def is_type(self, session, search_blocks):
|
||
|
meta = TintinMetadata()
|
||
|
hw_platform = meta.hw_platform()
|
||
|
|
||
|
transport_imp = session['transport_imp'].dereference().address
|
||
|
|
||
|
iap = self.try_session_address('s_iap_transport_implementation')
|
||
|
spp = self.try_session_address('s_plain_spp_transport_implementation')
|
||
|
ppogatt = self.try_session_address('s_ppogatt_transport_implementation')
|
||
|
qemu = self.try_session_address('s_qemu_transport_implementation')
|
||
|
pulse_pp = self.try_session_address('s_pulse_transport_implementation')
|
||
|
|
||
|
return transport_imp in [iap, spp, ppogatt, qemu, pulse_pp]
|
||
|
|
||
|
|
||
|
# --- Windows --- #
|
||
|
class NotificationNode(Recognizer):
|
||
|
type = 'NotifList'
|
||
|
|
||
|
def is_type(self, node, search_blocks):
|
||
|
s_presented_notifs = get_static_variable('s_presented_notifs')
|
||
|
notification_list = LinkedList(gdb.parse_and_eval(s_presented_notifs)['list_node'])
|
||
|
return node in notification_list
|
||
|
|
||
|
|
||
|
class Modal(Recognizer):
|
||
|
type = 'WindowStackItem'
|
||
|
|
||
|
def is_type(self, node, search_blocks):
|
||
|
s_modal_window_stack = get_static_variable('s_modal_window_stack')
|
||
|
modal_stack = LinkedList(gdb.parse_and_eval(s_modal_window_stack)['node'])
|
||
|
return node in modal_stack
|
||
|
|
||
|
|
||
|
# --- Misc --- #
|
||
|
class EventService(Recognizer):
|
||
|
type = 'EventServiceEntry'
|
||
|
|
||
|
def is_type(self, entry, search_blocks):
|
||
|
subscribers = 0
|
||
|
for x in xrange(Tasks().total):
|
||
|
if entry['subscribers'][x] != 0:
|
||
|
subscribers += 1
|
||
|
return entry['num_subscribers'] == subscribers
|
||
|
|
||
|
|
||
|
class VoiceEncoder(Recognizer):
|
||
|
type = 'VoiceEncoder'
|
||
|
|
||
|
|
||
|
class AlgState(Recognizer):
|
||
|
type = 'AlgState'
|
||
|
|
||
|
def is_type(self, state, search_blocks):
|
||
|
s_alg_state_ref = get_static_variable('s_alg_state')
|
||
|
s_alg_state = gdb.parse_and_eval(s_alg_state_ref)
|
||
|
return (state.dereference().address == s_alg_state)
|
||
|
|
||
|
|
||
|
class KAlgState(Recognizer):
|
||
|
type = 'KAlgState'
|
||
|
|
||
|
def is_type(self, state, search_blocks):
|
||
|
s_alg_state_ref = get_static_variable('s_alg_state')
|
||
|
s_alg_state = gdb.parse_and_eval(s_alg_state_ref)
|
||
|
return (s_alg_state["k_state"] == state.dereference().address)
|
||
|
|
||
|
|
||
|
class CachedResource(Recognizer):
|
||
|
type = 'CachedResource'
|
||
|
|
||
|
def is_type(self, resource, search_blocks):
|
||
|
s_resource_list = get_static_variable('s_resource_list')
|
||
|
resources = LinkedList(gdb.parse_and_eval(s_resource_list)['list_node'])
|
||
|
return resource in resources
|
||
|
|
||
|
|
||
|
# --- Applib --- #
|
||
|
class ModalWindow(Recognizer):
|
||
|
depends_on = 'Modal'
|
||
|
|
||
|
def is_type(self, window, search_blocks):
|
||
|
# Note that these are most likely dialogs. Try casting them.
|
||
|
return window in [block['modal_window'] for block in search_blocks]
|
||
|
|
||
|
try:
|
||
|
GBitmapFormats = gdb.types.make_enum_dict(gdb.lookup_type("enum GBitmapFormat"))
|
||
|
except gdb.error:
|
||
|
GBitmapFormats = None
|
||
|
|
||
|
# FIXME: This can be improved. It results in a lot of false negatives
|
||
|
class GBitmap(Recognizer):
|
||
|
type = 'GBitmap'
|
||
|
|
||
|
def is_type(self, bitmap, search_blocks):
|
||
|
if GBitmapFormats is None:
|
||
|
return False
|
||
|
|
||
|
row_size_bytes = bitmap['row_size_bytes']
|
||
|
bounds_width = bitmap['bounds']['size']['w']
|
||
|
format = int(bitmap['info']['format'])
|
||
|
is_circular_format = (format == GBitmapFormats['GBitmapFormat8BitCircular'])
|
||
|
is_valid_circular = (is_circular_format and row_size_bytes == 0)
|
||
|
is_valid_rect = (not is_circular_format and row_size_bytes * 8 >= bounds_width)
|
||
|
return (is_valid_circular or is_valid_rect) and (format in GBitmapFormats.values())
|
||
|
|
||
|
|
||
|
class GBitmap_addr(Recognizer):
|
||
|
depends_on = 'GBitmap'
|
||
|
|
||
|
def is_type(self, data, search_blocks):
|
||
|
return data in [block['addr'] for block in search_blocks]
|
||
|
|
||
|
|
||
|
class GBitmap_palette(Recognizer):
|
||
|
depends_on = 'GBitmap'
|
||
|
|
||
|
def is_type(self, data, search_blocks):
|
||
|
return data in [block['palette'] for block in search_blocks]
|
||
|
|
||
|
|
||
|
# class AnimationAux(Recognizer):
|
||
|
# type = 'AnimationAuxState'
|
||
|
|
||
|
# def is_type(self, aux, search_blocks):
|
||
|
# possible_aux = [gdb.parse_and_eval('kernel_applib_get_animation_state().aux'),
|
||
|
# gdb.parse_and_eval('s_app_state.animation_state.aux')]
|
||
|
|
||
|
# return aux in possible_aux
|
||
|
|
||
|
|
||
|
# class Animation(Recognizer):
|
||
|
# type = 'AnimationPrivate'
|
||
|
|
||
|
# def is_type(self, animation, search_blocks):
|
||
|
# animations = []
|
||
|
|
||
|
# kernel_state = gdb.parse_and_eval('kernel_applib_get_animation_state()')
|
||
|
# animations.extend(LinkedList(kernel_state['unscheduled_head'].dereference()))
|
||
|
# animations.extend(LinkedList(kernel_state['scheduled_head'].dereference()))
|
||
|
|
||
|
# app_state = gdb.parse_and_eval('s_app_state.animation_state')
|
||
|
# animations.extend(LinkedList(app_state['unscheduled_head'].dereference()))
|
||
|
|
||
|
# return animation in animations
|