# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

try:
    import gdb
except ImportError:
    raise Exception('This file is a GDB module.\n'
                    'It is not intended to be run outside of GDB.\n'
                    'Hint: to load a script in GDB, use `source this_file.py`')


import logging
import string
import types
import datetime

from collections import namedtuple, defaultdict, OrderedDict
from gdb_tintin import FreeRTOSMutex, Tasks, LinkedList
from gdb_symbols import get_static_variable, get_static_function
from gdb_tintin_metadata import TintinMetadata

logger = logging.getLogger(__name__)
recognizers = {}

_Recognizer = namedtuple('Recognizer', 'impl depends_on')


def register_recognizer(name, fn, dependency=None):
    """ Registers a recognizer.

    Recognizers are run against each block in the heap.
    They consist of a name, a dependency, and an implementation.
    The implementation consumes a block, a heap object, and a results dictionary and returns
    either a casted block or None.

    See Recognizer for auto-registering declarative classes.
    """
    recognizers[name] = _Recognizer(fn, dependency)


def parse_heap(heap, recognizer_subset=None):
    results = {
        'Free': heap.free_blocks()
    }

    heap_recognizers = recognizers
    if recognizer_subset:
        heap_recognizers = {name: recognizers[name] for name in recognizer_subset}

    ordered_recognizers, hidden = _order_recognizers(heap_recognizers)
    logger.info('Running: {}'.format(', '.join(ordered_recognizers.keys())))
    for name, recognizer in ordered_recognizers.iteritems():
        try:
            results[name] = filter(None, (recognizer.impl(block, heap, results) for
                                          block in heap.allocated_blocks()))
        except:
            print name + " hit an exception. Skipping"

    for dependency in hidden:
        del results[dependency]

    return results


def _order_recognizers(recognizer_subset):
    recognizer_subset = recognizer_subset.copy()
    ordered = OrderedDict()
    hidden = []
    last_length = -1

    # Run until we stop adding recognizers to our ordered list
    while recognizer_subset and len(ordered) != last_length:
        last_length = len(ordered)

        for name, recognizer in recognizer_subset.items():
            if not recognizer.depends_on or recognizer.depends_on in ordered:
                ordered[name] = recognizer
                del recognizer_subset[name]

    # Add implicit dependencies
    for name, recognizer in recognizer_subset.iteritems():
        if recognizer.depends_on not in ordered:
            logger.info('Adding dependency: {}'.format(recognizer.depends_on))
            ordered[recognizer.depends_on] = recognizers[recognizer.depends_on]
            hidden.append(recognizer.depends_on)
        ordered[name] = recognizer

    return [ordered, hidden]


class RecognizerType(type):
    def __new__(cls, name, bases, dct):
        for key in dct:
            # Convert instance methods into class methods
            if isinstance(dct[key], types.FunctionType):
                dct[key] = classmethod(dct[key])

        if dct.get('depends_on') == 'Heap':
            dct['depends_on'] = None
            dct['uses_heap'] = True
        else:
            dct['uses_heap'] = False

        return type.__new__(cls, name, bases, dct)

    def __init__(cls, name, bases, dct):
        if name != 'Recognizer':
            register_recognizer(name, cls, dct.get('depends_on'))

        super(RecognizerType, cls).__init__(name, bases, dct)

    def __call__(cls, block, heap, results):
        """ Returns either a casted block or None.

        This allows the class object itself to implement the recognizer protocol.
        This means we can register the class as a recognizer without creating an instance of it.
        """
        try:
            right_size = (block.size in heap.object_size(cls.type)) if cls.type else True
        except gdb.error:
            # Type doesn't exist
            return None

        if cls.uses_heap:
            search_blocks = [r_block.data for r_block in heap]
        else:
            search_blocks = [r_block.data for r_block in results.get(cls.depends_on) or []]

        if right_size and cls.is_type(block.cast(cls.type), search_blocks):
            return block.cast(cls.type, clone=True)
        else:
            return None

    def __str__(cls):
        return cls.__name__


class Recognizer(object):
    """ This is a declarative recognizer. It auto-registers with the recognizer dictionary.

    Note that declarative recognizers are singletons that don't get instantiated, so
    __init__ is never called and any modifications to class members are persistent.
    This behavior holds for subclasses.

    Example:
    >>> # A Recognizer's class name is its friendly name.
    ... class Test(Recognizer):
    ...
    ...     # Its (optional) type is the C struct it's searching for.
    ...     type = 'TestStruct'
    ...
    ...     # Its (optional) dependency is the friendly name of the type
    ...     # whose results it depends on. Those results are passed to
    ...     # is_type as search_blocks.
    ...     depends_on = 'Heap'
    ...
    ...     # is_type() is the comparison function. If type is not NULL,
    ...     # each block will be casted to 'type'. Note that self actually
    ...     # refers to the class.
    ...     def is_type(self, block, search_blocks):
    ...         return block['foo'] == 4
    """
    __metaclass__ = RecognizerType

    type = None
    depends_on = None

    def is_type(self, block, search_blocks):
        return True


# --- Recognizers --- #


# --- Mutexes --- #
class PebbleMutex(Recognizer):
    type = 'PebbleMutexCommon'

    def is_type(self, pebble_mutex, search_blocks):
        mutex = FreeRTOSMutex(pebble_mutex['freertos_mutex'])
        return ((not mutex.locked()) == (pebble_mutex['lr'] == 0) and
                (0 <= mutex.num_waiters() <= 10))


class LightMutex(Recognizer):
    type = 'LightMutex_t'
    depends_on = 'PebbleMutex'

    def is_type(self, mutex, search_blocks):
        return mutex in [block['freertos_mutex'] for block in search_blocks]


# --- Queues --- #
class Queue(Recognizer):
    depends_on = 'Heap'

    def is_type(self, data, search_blocks):
        queue_type = gdb.lookup_type('Queue_t')
        queue = data.cast(queue_type.pointer())
        queue_size = int(queue_type.sizeof)

        storage_size = queue['uxLength'] * queue['uxItemSize']
        correct_head = (queue['pcHead'] >= data)
        correct_tail = (queue['pcTail'] == queue['pcHead'] + storage_size)
        return (queue['uxLength'] > 0 and queue['pcWriteTo'] != 0 and correct_head and correct_tail)


class Semaphore(Recognizer):
    type = 'Queue_t'
    depends_on = 'Queue'

    def is_type(self, semaphore, search_blocks):
        return semaphore['uxItemSize'] == 0 and semaphore in search_blocks


# --- Data Logging --- #
class DataLoggingSession(Recognizer):
    type = 'DataLoggingSession'

    def is_type(self, dls, search_blocks):
        total_tasks = Tasks().total
        return 0 <= dls['task'] < total_tasks


class ActiveDLS(Recognizer):
    type = 'DataLoggingActiveState'
    depends_on = 'DataLoggingSession'

    def is_type(self, active_dls, search_blocks):
        return active_dls in [block['data'] for block in search_blocks]


class DLSBuffer(Recognizer):
    depends_on = 'ActiveDLS'

    def is_type(self, dls_buffer, search_blocks):
        return dls_buffer in [block['buffer_storage'] for block in search_blocks]


# --- Tasks --- #
class Task(Recognizer):
    type = 'TCB_t'

    def is_type(self, task, search_blocks):
        return is_string(task['pcTaskName'])


class TaskStack(Recognizer):
    depends_on = 'Task'

    def is_type(self, stack, search_blocks):
        return stack in [block['pxStack'] for block in search_blocks]


# --- String-related --- #
def is_string(data):
    try:
        data = data.string().decode('utf-8')
        return len(data) > 2 and all(ord(codepoint) > 127 or
                                     codepoint in string.printable for codepoint in data)
    except UnicodeDecodeError:
        return False
    except UnicodeEncodeError:
        return False


class String(Recognizer):

    def is_type(self, string, search_blocks):
        return is_string(string.cast(gdb.lookup_type('char').pointer()))


class PFSFileNode(Recognizer):
    type = 'PFSFileChangedCallbackNode'
    depends_on = 'String'

    def is_type(self, filenode, search_blocks):
        return filenode['name'] in search_blocks


class SettingsFile(Recognizer):
    type = 'SettingsFile'
    depends_on = 'String'

    def is_type(self, settings, search_blocks):
        try:
            timestamp = int(settings['last_modified'])
            date = datetime.datetime.fromtimestamp(timestamp)
        except ValueError:
            return False

        # If a user closes a settings file but forgets to free the
        # underlying memory the name will not be in search_blocks
        return (settings['max_used_space'] <= settings['max_space_total'] and
                settings['used_space'] <= settings['max_used_space'] and
                date.year > 2010 and date.year < 2038)


# --- Analytics --- #
class AnalyticsStopwatch(Recognizer):
    type = 'AnalyticsStopwatchNode'

    def is_type(self, stopwatch, search_blocks):
        s_stopwatch_list = get_static_variable('s_stopwatch_list')
        stopwatch_list = LinkedList(gdb.parse_and_eval(s_stopwatch_list).dereference())
        return stopwatch in stopwatch_list


class AnalyticsHeartbeatList(Recognizer):
    type = 'AnalyticsHeartbeatList'

    def is_type(self, listnode, search_blocks):
        s_app_heartbeat_list = get_static_variable('s_app_heartbeat_list')
        heartbeat_list = LinkedList(gdb.parse_and_eval(s_app_heartbeat_list)['node'])
        return listnode in heartbeat_list


class AnalyticsHeartbeat(Recognizer):
    depends_on = 'AnalyticsHeartbeatList'

    def is_type(self, heartbeat, search_blocks):
        s_device_heartbeat = get_static_variable('s_device_heartbeat')
        device_heartbeat = gdb.parse_and_eval(s_device_heartbeat)
        return (heartbeat == device_heartbeat or
                heartbeat in [block['heartbeat'] for block in search_blocks])


# --- Timers --- #
class TaskTimer(Recognizer):
    type = 'TaskTimer'

    def is_type(self, timer, search_blocks):
        timer_manager_ref = get_static_variable('s_task_timer_manager')
        timer_manager = gdb.parse_and_eval(timer_manager_ref)
        max_timer_id = int(timer_manager["next_id"]) - 1
        max_timer_id = int() - 1
        return 1 <= timer['id'] <= max_timer_id


class EventedTimer(Recognizer):
    type = 'EventedTimer'

    def is_type(self, timer, search_blocks):
        timer_manager_ref = get_static_variable('s_task_timer_manager')
        timer_manager = gdb.parse_and_eval(timer_manager_ref)
        max_timer_id = int(timer_manager["next_id"]) - 1
        total_tasks = Tasks().total

        return (1 <= timer['sys_timer_id'] <= max_timer_id
                and 0 <= timer['target_task'] < total_tasks)


# --- Communication --- #
class CommSession(Recognizer):
    type = 'CommSession'
    depends_on = 'Heap'

    def try_session_address(self, var_name):
        try:
            var_ref = get_static_variable(var_name)
            return gdb.parse_and_eval(var_ref).address
        except:
            return None


    def is_type(self, session, search_blocks):
        meta = TintinMetadata()
        hw_platform = meta.hw_platform()

        transport_imp = session['transport_imp'].dereference().address

        iap = self.try_session_address('s_iap_transport_implementation')
        spp = self.try_session_address('s_plain_spp_transport_implementation')
        ppogatt = self.try_session_address('s_ppogatt_transport_implementation')
        qemu = self.try_session_address('s_qemu_transport_implementation')
        pulse_pp = self.try_session_address('s_pulse_transport_implementation')

        return transport_imp in [iap, spp, ppogatt, qemu, pulse_pp]


# --- Windows --- #
class NotificationNode(Recognizer):
    type = 'NotifList'

    def is_type(self, node, search_blocks):
        s_presented_notifs = get_static_variable('s_presented_notifs')
        notification_list = LinkedList(gdb.parse_and_eval(s_presented_notifs)['list_node'])
        return node in notification_list


class Modal(Recognizer):
    type = 'WindowStackItem'

    def is_type(self, node, search_blocks):
        s_modal_window_stack = get_static_variable('s_modal_window_stack')
        modal_stack = LinkedList(gdb.parse_and_eval(s_modal_window_stack)['node'])
        return node in modal_stack


# --- Misc --- #
class EventService(Recognizer):
    type = 'EventServiceEntry'

    def is_type(self, entry, search_blocks):
        subscribers = 0
        for x in xrange(Tasks().total):
            if entry['subscribers'][x] != 0:
                subscribers += 1
        return entry['num_subscribers'] == subscribers


class VoiceEncoder(Recognizer):
    type = 'VoiceEncoder'


class AlgState(Recognizer):
    type = 'AlgState'

    def is_type(self, state, search_blocks):
        s_alg_state_ref = get_static_variable('s_alg_state')
        s_alg_state = gdb.parse_and_eval(s_alg_state_ref)
        return (state.dereference().address == s_alg_state)


class KAlgState(Recognizer):
    type = 'KAlgState'

    def is_type(self, state, search_blocks):
        s_alg_state_ref = get_static_variable('s_alg_state')
        s_alg_state = gdb.parse_and_eval(s_alg_state_ref)
        return (s_alg_state["k_state"] == state.dereference().address)


class CachedResource(Recognizer):
    type = 'CachedResource'

    def is_type(self, resource, search_blocks):
        s_resource_list = get_static_variable('s_resource_list')
        resources = LinkedList(gdb.parse_and_eval(s_resource_list)['list_node'])
        return resource in resources


# --- Applib --- #
class ModalWindow(Recognizer):
    depends_on = 'Modal'

    def is_type(self, window, search_blocks):
        # Note that these are most likely dialogs. Try casting them.
        return window in [block['modal_window'] for block in search_blocks]

try:
    GBitmapFormats = gdb.types.make_enum_dict(gdb.lookup_type("enum GBitmapFormat"))
except gdb.error:
    GBitmapFormats = None

# FIXME: This can be improved. It results in a lot of false negatives
class GBitmap(Recognizer):
    type = 'GBitmap'

    def is_type(self, bitmap, search_blocks):
        if GBitmapFormats is None:
            return False

        row_size_bytes = bitmap['row_size_bytes']
        bounds_width = bitmap['bounds']['size']['w']
        format = int(bitmap['info']['format'])
        is_circular_format = (format == GBitmapFormats['GBitmapFormat8BitCircular'])
        is_valid_circular = (is_circular_format and row_size_bytes == 0)
        is_valid_rect = (not is_circular_format and row_size_bytes * 8 >= bounds_width)
        return (is_valid_circular or is_valid_rect) and (format in GBitmapFormats.values())


class GBitmap_addr(Recognizer):
    depends_on = 'GBitmap'

    def is_type(self, data, search_blocks):
        return data in [block['addr'] for block in search_blocks]


class GBitmap_palette(Recognizer):
    depends_on = 'GBitmap'

    def is_type(self, data, search_blocks):
        return data in [block['palette'] for block in search_blocks]


# class AnimationAux(Recognizer):
#     type = 'AnimationAuxState'

#     def is_type(self, aux, search_blocks):
#         possible_aux = [gdb.parse_and_eval('kernel_applib_get_animation_state().aux'),
#                         gdb.parse_and_eval('s_app_state.animation_state.aux')]

#         return aux in possible_aux


# class Animation(Recognizer):
#     type = 'AnimationPrivate'

#     def is_type(self, animation, search_blocks):
#         animations = []

#         kernel_state = gdb.parse_and_eval('kernel_applib_get_animation_state()')
#         animations.extend(LinkedList(kernel_state['unscheduled_head'].dereference()))
#         animations.extend(LinkedList(kernel_state['scheduled_head'].dereference()))

#         app_state = gdb.parse_and_eval('s_app_state.animation_state')
#         animations.extend(LinkedList(app_state['unscheduled_head'].dereference()))

#         return animation in animations