# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ GDB server proxy for the QEMU emulator running a Pebble machine. This proxy sits between gdb and the gdb server implemented in QEMU. Its primary purpose is to implement support for the "info threads" and related gdb commands. The QEMU gdb server is not thread aware and doesn't have any FreeRTOS knowledge such that it can figure out the FreeRTOS threads created in the Pebble. This proxy talks to the QEMU gdb server using primitive gdb remote commands and inspects the FreeRTOS task structures to figure out which threads have been created, their saved registers, etc. and then returns that information to gdb when it asks for thread info from the target system. For most other requests recevied from gdb, this proxy simply acts as a passive pass thru to the QEMU gdb server. This module is designed to be run as a separate process from both QEMU and gdb. It connects to the gdb socket created by QEMU and accepts connections from gdb. The intent is that this module would be launched whenever QEMU is launched and likewise taken down whenever QEMU exits. To support this, we exit this process whenever we detect that the QEMU gdb server connection has closed. """ import logging, socket from struct import unpack from time import sleep import sys import time import argparse import select CTRL_C_CHARACTER = '\3' ########################################################################################## class QemuGdbError(Exception): pass ########################################################################################## def byte_swap_uint32(val): """ Return a byte-swapped 32-bit value """ return ( ((val & 0xFF000000) >> 24) | ((val & 0x00FF0000) >> 8) | ((val & 0x0000FF00) << 8) | ((val & 0x000000FF) << 24)) ########################################################################################## class PebbleThread(object): """ This class encapsulates the information about a thread on the Pebble """ # Mapping of register name to register index reg_name_to_index = {name: num for num, name in enumerate( 'r0 r1 r2 r3 r4 r5 r6 r7 r8 r9 r10 r11 r12 sp lr pc xpsr'.split())} # Offset of each register on the thread's stack # stack_offset -> register_index stack_offset_to_reg_index_v2 = [ # Used in Snowy, Cortex-M4 (0x28, 0), # r0 (0x2C, 1), # r1 (0x30, 2), # r2 (0x34, 3), # r3 (0x04, 4), # r4 (0x08, 5), # r5 (0x0C, 6), # r6 (0x10, 7), # r7 (0x14, 8), # r8 (0x18, 9), # r9 (0x1C, 10), # r10 (0x20, 11), # r11 (0x38, 12), # r12 (0x3C, 14), # lr (0x40, 15), # pc (0x44, 16), # xpsr ] thread_state_size_v2 = 0x48 stack_offset_to_reg_index_v1 = [ # Used in Tintin, Cortex-M3 (0x24, 0), # r0 (0x28, 1), # r1 (0x2C, 2), # r2 (0x30, 3), # r3 (0x04, 4), # r4 (0x08, 5), # r5 (0x0C, 6), # r6 (0x10, 7), # r7 (0x14, 8), # r8 (0x18, 9), # r9 (0x1C, 10), # r10 (0x20, 11), # r11 (0x34, 12), # r12 (0x38, 14), # lr (0x3C, 15), # pc (0x40, 16), # xpsr ] thread_state_size_v1 = 0x44 def __init__(self, id, ptr, running, name, registers): self.id = id self.ptr = ptr self.running = running self.name = name self.registers = registers def set_register(self, reg_index, value): self.registers[reg_index] = value def get_register(self, reg_index): return self.registers[reg_index] def __repr__(self): return " 0 and elem_ptr != 0 and elem_ptr != prev_elem_ptr and len(self.threads) < num_threads): thread_ptr = self._target_read_uint32(elem_ptr + FRTOS_LIST_ELEM_CONTENT_OFFSET) thread_running = (thread_ptr == current_thread) # The QEMU gdb server assigns the active thread a thread ID of 1 and if we change it # to something else (like the TCB ptr), then things are not ideal. For example, gdb # will display a "The current thread has terminated" message. # So, we will preserve 1 for the current thread and assign the TCB ptr for the # others if thread_running: thread_id = self.QEMU_MONITOR_CURRENT_THREAD_ID else: thread_id = thread_ptr thread_name = self._target_read_cstr(thread_ptr + FRTOS_THREAD_NAME_OFFSET, 32) stack = self._target_read_uint32(thread_ptr + FRTOS_THREAD_STACK_OFFSET) registers = [0] * len(PebbleThread.reg_name_to_index) for (offset, reg_index) in stack_offset_to_reg_index: registers[reg_index] = self._target_read_uint32(stack + offset) registers[13] = stack + thread_state_size # Create the thread instance thread = PebbleThread(id=thread_id, ptr=thread_ptr, running=thread_running, name=thread_name, registers=registers) self.threads[thread_id] = thread logging.debug("Got thread info: %r" % (thread)) # Another thread in this list? prev_elem_ptr = elem_ptr elem_ptr = self._target_read_uint32(elem_ptr + FRTOS_LIST_ELEM_NEXT_OFFSET) thread_count -= 1 ########################################################################################## def _handle_set_active_thread_req(self, data): num = int(data, 16) if (num == -1): # All threads return elif (num == 0): # Any thread num = self.QEMU_MONITOR_CURRENT_THREAD_ID self.active_thread_id = num return self._create_packet("OK") ########################################################################################## def _handle_continue_req(self, msg): """ The format of this is: 'vCont[;action[:thread-id]]...' The QEMU gdb server only understands a thread id of 1, so if we pass it other thread ids, it will barf. """ if ';' not in msg: return None action_thread_pair = msg.split(';')[1] if ':' in action_thread_pair: action = action_thread_pair.split(':')[0] else: action = action_thread_pair # Send to target with the thread ID packet = self._create_packet("vCont;%s" % (action)) self.target_socket.send(packet) # Change back to active thread of 1 self.active_thread_id = self.QEMU_MONITOR_CURRENT_THREAD_ID return '' ########################################################################################## def _handle_thread_is_alive_req(self, data): num = int(data, 16) if (num == -1 or num == 0): # All threads return self._create_packet("OK") if num in self.threads: return self._create_packet("OK") return self._create_packet("E22") ########################################################################################## def _handle_get_all_registers_req(self): """ Get all registers for the active thread """ resp = '' for i in range(len(PebbleThread.reg_name_to_index)): value = self._target_read_register(i) resp += "%08X" % (byte_swap_uint32(value)) return self._create_packet(resp) ########################################################################################## def _handle_query_req(self, msg): msg = msg.split('#')[0] query = msg.split(':') logging.debug('GDB received query: %s', query) if query is None: logging.error('GDB received query packet malformed') return None elif query[0] == 'C': return self._create_packet("%d" % (self.active_thread_id)) elif query[0] == 'fThreadInfo': if not self.got_all_symbols: # NOTE: When running the 4.9 gcc tool chain, gdb asks for thread info right # after attaching, before we have a chance to look up symbols, so respond # with "last thread" if we don't have symbols yet. return self._create_packet("l") # last self._target_collect_thread_info() # For some strange reason, if the active thread is first, the first "i thread" gdb # command only displays that one thread, so reverse sort to put it at the end id_strs = ("%016x" % id for id in sorted(self.threads.keys(), reverse=True)) return self._create_packet("m" + ",".join(id_strs)) elif query[0] == 'sThreadInfo': return self._create_packet("l") # last elif query[0].startswith('ThreadExtraInfo'): id_str = query[0].split(',')[1] id = int(id_str, 16) found_thread = self.threads.get(id, None) if found_thread is None: resp = "" % (id) elif found_thread.running: resp = "%s 0x%08X: Running" % (found_thread.name, found_thread.ptr) else: resp = "%s 0x%08X" % (found_thread.name, found_thread.ptr) return self._create_packet(resp.encode('hex')) elif 'Symbol' in query[0]: if query[2] != '': sym_name = query[2].decode('hex') if query[1] != '': sym_value = int(query[1], 16) logging.debug("Setting value of symbol '%s' to 0x%08x" % (sym_name, sym_value)) self.symbol_dict[sym_name] = sym_value else: logging.debug("Could not find value of symbol '%s'" % (sym_name)) self.symbol_dict[sym_name] = '' # Anymore we need to look up? symbol = None for x, y in self.symbol_dict.items(): if y is None: symbol = x break if symbol is not None: logging.debug("Asking gdb to lookup symbol %s" % (symbol)) return self._create_packet('qSymbol:%s' % (symbol.encode('hex'))) else: self.got_all_symbols = True return self._create_packet('OK') else: return None ########################################################################################## def _handle_request(self, msg): """ See if we want to handle a request directly here in the proxy retval: resp, resp: Response to return. if None, proxy doesn't deal with the request directly """ logging.debug('-->>>>>>>>>>>> GDB req packet: %s', msg) msg = msg.split('#')[0] # query command if msg[1] == 'q': return self._handle_query_req(msg[2:]) elif msg[1] == 'H': if msg[2] == 'c': return None else: return self._handle_set_active_thread_req(msg[3:]) elif msg[1] == 'T': return self._handle_thread_is_alive_req(msg[2:]) elif msg[1] == 'g': if (self.active_thread_id <= 0 or self.active_thread_id == self.QEMU_MONITOR_CURRENT_THREAD_ID): return None else: return self._handle_get_all_registers_req() elif msg[1] == 'p': # 'p ' : read value of register n if self.active_thread_id == self.QEMU_MONITOR_CURRENT_THREAD_ID: return None else: msg = msg[2:] reg_num = int(msg, 16) value = self._target_read_register(reg_num) return self._create_packet("%08X" % (byte_swap_uint32(value))) elif msg[1] == 'P': # 'P =' : set value of register n to r if self.active_thread_id == self.QEMU_MONITOR_CURRENT_THREAD_ID: return None else: msg = msg[2:].split('=') reg_num = int(msg[0], 16) val = int(msg[1], 16) val = byte_swap_uint32(val) self._target_write_register(reg_num, val) return self._create_packet("OK") elif msg[1:].startswith('vCont'): return self._handle_continue_req(msg[1:]) else: return None ########################################################################################## def run(self): """ Run the proxy """ # Connect to the target system first logging.info("Connecting to target system on %s:%s" % (self.target_host, self.target_port)) start_time = time.time() connected = False self.target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while not connected and (time.time() - start_time < self.connect_timeout): try: self.target_socket.connect((self.target_host, self.target_port)) connected = True except socket.error: self.target_socket.close() self.target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) time.sleep(0.1) if not connected: raise QemuGdbError("Unable to connect to target system on %s:%s. Is the emulator" " running?" % (self.target_host, self.target_port)) logging.info("Connected to target system on %s:%s" % (self.target_host, self.target_port)) # Open up our socket to accept connect requests from the client (gdb) self.client_accept_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client_accept_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.client_accept_socket.bind(('', self.client_accept_port)) self.client_accept_socket.listen(5) # Empty out any unsolicited data sent from the target (target_data, client_data) = self._fetch_socket_data(timeout=0.1) # -------------------------------------------------------------------------------------- # Loop processing requests data = '' while True: # read more data from client until we get at least one packet while True: (target_data, client_data) = self._fetch_socket_data() # Pass through any response from the target back to gdb if target_data and self.client_conn_socket is not None: self.client_conn_socket.send(target_data) # Ctrl-C interrupt? if CTRL_C_CHARACTER in client_data: self.target_socket.send(CTRL_C_CHARACTER) client_data = client_data[client_data.index(CTRL_C_CHARACTER)+1:] data += client_data if "$" in data and "#" in data: break # Process all complete packets we have received from the client while "$" in data and "#" in data: data = data[data.index("$"):] logging.debug("Processing remaining data: %s" % (data)) end = data.index("#") + 3 # 2 bytes of checksum packet = data[0:end] data = data[end:] # decode and prepare resp logging.debug("Processing packet: %s" % (packet)) resp = self._handle_request(packet) # If it's nothing we care about, pass to target and return the response back to # client if resp is None: logging.debug("Sending request to target: %s" % (packet)) self.target_socket.send(packet) # else, we generated our own response that needs to go to the client elif resp != '': self.client_conn_socket.send('+' + resp) # wait for ack from the client (target_data, client_data) = self._fetch_socket_data() if target_data: self.client_conn_socket.send(target_data) if client_data[0] != '+': logging.debug('gdb client did not ack') else: logging.debug('gdb client acked') # Add to our accumulated content data += client_data[1:] #################################################################################################### if __name__ == '__main__': # Collect our command line arguments parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--port', type=int, default=1233, help="Port to accept incomming connections on") parser.add_argument('--target', default='localhost:1234', help="target to connect to ") parser.add_argument('--connect_timeout', type=float, default=1.0, help="give up if we can't connect to the target within this timeout (sec)") parser.add_argument('--debug', action='store_true', help="Turn on debug logging") args = parser.parse_args() level = logging.INFO if args.debug: level = logging.DEBUG logging.basicConfig(level=level) (target_host, target_port) = args.target.split(':') proxy = QemuGdbProxy(port=args.port, target_host=target_host, target_port=int(target_port), connect_timeout=args.connect_timeout) proxy.run()