pebble/tools/qemu/qemu_gdb_proxy.py

716 lines
28 KiB
Python
Raw Permalink Normal View History

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
GDB server proxy for the QEMU emulator running a Pebble machine.
This proxy sits between gdb and the gdb server implemented in QEMU. Its primary purpose is to
implement support for the "info threads" and related gdb commands. The QEMU gdb server is not thread
aware and doesn't have any FreeRTOS knowledge such that it can figure out the FreeRTOS threads
created in the Pebble.
This proxy talks to the QEMU gdb server using primitive gdb remote commands and inspects the
FreeRTOS task structures to figure out which threads have been created, their saved registers, etc.
and then returns that information to gdb when it asks for thread info from the target system. For
most other requests recevied from gdb, this proxy simply acts as a passive pass thru to the QEMU gdb
server.
This module is designed to be run as a separate process from both QEMU and gdb. It connects to the
gdb socket created by QEMU and accepts connections from gdb. The intent is that this module would
be launched whenever QEMU is launched and likewise taken down whenever QEMU exits. To support this,
we exit this process whenever we detect that the QEMU gdb server connection has closed.
"""
import logging, socket
from struct import unpack
from time import sleep
import sys
import time
import argparse
import select
CTRL_C_CHARACTER = '\3'
##########################################################################################
class QemuGdbError(Exception):
pass
##########################################################################################
def byte_swap_uint32(val):
""" Return a byte-swapped 32-bit value """
return ( ((val & 0xFF000000) >> 24)
| ((val & 0x00FF0000) >> 8)
| ((val & 0x0000FF00) << 8)
| ((val & 0x000000FF) << 24))
##########################################################################################
class PebbleThread(object):
""" This class encapsulates the information about a thread on the Pebble """
# Mapping of register name to register index
reg_name_to_index = {name: num for num, name in enumerate(
'r0 r1 r2 r3 r4 r5 r6 r7 r8 r9 r10 r11 r12 sp lr pc xpsr'.split())}
# Offset of each register on the thread's stack
# stack_offset -> register_index
stack_offset_to_reg_index_v2 = [ # Used in Snowy, Cortex-M4
(0x28, 0), # r0
(0x2C, 1), # r1
(0x30, 2), # r2
(0x34, 3), # r3
(0x04, 4), # r4
(0x08, 5), # r5
(0x0C, 6), # r6
(0x10, 7), # r7
(0x14, 8), # r8
(0x18, 9), # r9
(0x1C, 10), # r10
(0x20, 11), # r11
(0x38, 12), # r12
(0x3C, 14), # lr
(0x40, 15), # pc
(0x44, 16), # xpsr
]
thread_state_size_v2 = 0x48
stack_offset_to_reg_index_v1 = [ # Used in Tintin, Cortex-M3
(0x24, 0), # r0
(0x28, 1), # r1
(0x2C, 2), # r2
(0x30, 3), # r3
(0x04, 4), # r4
(0x08, 5), # r5
(0x0C, 6), # r6
(0x10, 7), # r7
(0x14, 8), # r8
(0x18, 9), # r9
(0x1C, 10), # r10
(0x20, 11), # r11
(0x34, 12), # r12
(0x38, 14), # lr
(0x3C, 15), # pc
(0x40, 16), # xpsr
]
thread_state_size_v1 = 0x44
def __init__(self, id, ptr, running, name, registers):
self.id = id
self.ptr = ptr
self.running = running
self.name = name
self.registers = registers
def set_register(self, reg_index, value):
self.registers[reg_index] = value
def get_register(self, reg_index):
return self.registers[reg_index]
def __repr__(self):
return "<Thread id:%d, ptr:0x%08X, running:%r, name:%s, registers:%r" % (self.id, self.ptr,
self.running, self.name, self.registers)
##########################################################################################
class QemuGdbProxy(object):
"""
This class implements a GDB server listening for a gdb connection on a specific port.
It connects to and acts as a proxy to yet another gdb server running on the target system.
This proxy implements advanced gdb commands like getting thread info by looking up the values of
specific FreeRTOS symbols and querying the FreeRTOS data structures on the target system.
"""
##########################################################################################
def __init__(self, port, target_host, target_port, connect_timeout):
# The "target" is the remote system we are debugging. The target implements a basic
# gdb remote server
self.target_host = target_host
self.target_port = target_port
self.target_socket = None
self.connect_timeout = connect_timeout
# The "client" is gdb
self.client_accept_socket = None
self.client_accept_port = port
self.client_conn_socket = None
self.packet_size = 2048
self.active_thread_id = 0 # Selected by GDB
self.threads = {} # key is the thread id, value is a PebbleThread object
# The QEMU gdb remote server always assigns a thread ID of 1 to it's one and only thread
self.QEMU_MONITOR_CURRENT_THREAD_ID = 1
# Free RTOS symbols we need to look up in order to inspect FreeRTOS threads
symbol_list = [
"uxFreeRTOSRegisterStackingVersion",
"pxCurrentTCB",
"pxReadyTasksLists",
"xDelayedTaskList1",
"xDelayedTaskList2",
"pxDelayedTaskList",
"pxOverflowDelayedTaskList",
"xPendingReadyList",
"xTasksWaitingTermination",
"xSuspendedTaskList",
"uxCurrentNumberOfTasks",
]
self.symbol_dict = {symbol: None for symbol in symbol_list}
self.got_all_symbols = False
##########################################################################################
def _fetch_socket_data(self, timeout=None):
""" Fetch available data from our sockets (client and target). Block until any
data is available, or until the target connection is closed. If we detect that the
target connection has closed, we exit this app.
If we detect that the client connection (from gdb) has closed, we wait for a new connection
request from gdb.
retval:
(target_data, client_data)
"""
target_data = ''
client_data = ''
while (not target_data and not client_data):
# Form our read list. The target socket is always in the read list. Depending on if we
# are waiting for a client connection or not, we either put the client_accept_socket or
# client_conn_socket in the list.
if self.client_conn_socket is not None:
read_list = [self.target_socket, self.client_conn_socket]
else:
read_list = [self.target_socket, self.client_accept_socket]
readable, writable, errored = select.select(read_list, [], [], timeout)
# If nothing ready, we must have timed out
if not readable:
logging.debug("read timeout")
break
# Data available from target?
if self.target_socket in readable:
target_data = self.target_socket.recv(self.packet_size)
if not target_data:
raise QemuGdbError("target system disconnected")
logging.debug("got target data: '%s' (0x%s) " % (target_data,
target_data.encode('hex')))
# Data available from client?
if self.client_conn_socket is not None:
if self.client_conn_socket in readable:
client_data = self.client_conn_socket.recv(self.packet_size)
if not client_data:
logging.info("client connection closed")
self.client_conn_socket.close()
self.client_conn_socket = None
logging.debug("got client data: '%s' (0x%s) " % (client_data,
client_data.encode('hex')))
# Connection request from client?
else:
if self.client_accept_socket in readable:
self.client_conn_socket, _ = self.client_accept_socket.accept()
logging.info("Connected to client")
return (target_data, client_data)
##########################################################################################
def _create_packet(self, data):
checksum = sum(ord(c) for c in data) % 256
packet = "$%s#%02X" % (data, checksum)
logging.debug('--<<<<<<<<<<<< GDB packet: %s', packet)
return packet
##########################################################################################
def _target_read_register(self, reg_index):
""" Fetch the value of the given register index from the active thread """
try:
thread = self.threads[self.active_thread_id]
except KeyError:
raise QemuGdbError("Unknown thread id")
return thread.get_register(reg_index)
##########################################################################################
def _target_write_register(self, reg_index, value):
""" Update the value of the given register index in the active thread """
try:
thread = self.threads[self.active_thread_id]
except KeyError:
raise QemuGdbError("Unknown thread id")
print "TODO: NEED TO WRITE TO THREAD STACK ON TARGET TOO"
thread.set_register(reg_index, value)
##########################################################################################
def _target_read_memory(self, address, bytes):
request = self._create_packet('m %08X,%08X' % (address, bytes))
self.target_socket.send(request)
# read response
data = ''
while True:
target_data = self.target_socket.recv(self.packet_size)
if not target_data:
raise QemuGdbError("target system disconnected")
data += target_data
if "$" in data and "#" in data:
break
_, data = data.split('$', 1)
logging.debug("Received target response: %s" % (data))
resp = data.split('#', 1)[0]
if resp.startswith('E '):
raise QemuGdbError("Error response %s", resp)
return resp
##########################################################################################
def _target_read_uint32(self, address):
hex = self._target_read_memory(address, 4)
value = int(hex, 16)
return byte_swap_uint32(value)
##########################################################################################
def _target_read_uint8(self, address):
hex = self._target_read_memory(address, 1)
value = int(hex, 16)
return value
##########################################################################################
def _target_read_cstr(self, address, max_len):
str_hex = self._target_read_memory(address, max_len)
str = str_hex.decode('hex')
return str.split('\0', 1)[0]
##########################################################################################
def _target_collect_thread_info(self):
# FreeRTOS params used to collect thread info
FRTOS_LIST_NEXT_OFFSET = 16
FRTOS_LIST_WIDTH = 20
FRTOS_LIST_ELEM_NEXT_OFFSET = 8
FRTOS_LIST_ELEM_CONTENT_OFFSET = 12
FRTOS_THREAD_STACK_OFFSET = 0
FRTOS_THREAD_NAME_OFFSET = 84
FRTOS_MAX_PRIORITIES = 5
num_threads = self._target_read_uint32(self.symbol_dict['uxCurrentNumberOfTasks'])
# Figure out the register stacking
if self.symbol_dict['uxFreeRTOSRegisterStackingVersion']:
reg_stacking_version = self._target_read_uint8(
self.symbol_dict['uxFreeRTOSRegisterStackingVersion'])
else:
reg_stacking_version = 1
if reg_stacking_version == 1:
stack_offset_to_reg_index = PebbleThread.stack_offset_to_reg_index_v1
thread_state_size = PebbleThread.thread_state_size_v1
elif reg_stacking_version == 2:
stack_offset_to_reg_index = PebbleThread.stack_offset_to_reg_index_v2
thread_state_size = PebbleThread.thread_state_size_v2
else:
raise QemuGdbError("Unsupported uxFreeRTOSRegisterStackingVersion of %d" %
reg_stacking_version)
# Get total number of threads and current thread ID
num_threads = self._target_read_uint32(self.symbol_dict['uxCurrentNumberOfTasks'])
current_thread = self._target_read_uint32(self.symbol_dict['pxCurrentTCB'])
self.threads = {}
# Get the address of each list
list_addresses = []
address = self.symbol_dict['pxReadyTasksLists']
for i in range(FRTOS_MAX_PRIORITIES):
list_addresses.append(address + i * FRTOS_LIST_WIDTH)
for name in ['xDelayedTaskList1', 'xDelayedTaskList2', 'xPendingReadyList',
'xSuspendedTaskList', 'xTasksWaitingTermination']:
list_addresses.append(self.symbol_dict[name])
# Fetch the tasks from each list
for list in list_addresses:
thread_count = self._target_read_uint32(list)
if thread_count == 0:
continue
# Location of first item
elem_ptr = self._target_read_uint32(list + FRTOS_LIST_NEXT_OFFSET)
# Loop through the list
prev_elem_ptr = -1
while (thread_count > 0 and elem_ptr != 0 and elem_ptr != prev_elem_ptr
and len(self.threads) < num_threads):
thread_ptr = self._target_read_uint32(elem_ptr + FRTOS_LIST_ELEM_CONTENT_OFFSET)
thread_running = (thread_ptr == current_thread)
# The QEMU gdb server assigns the active thread a thread ID of 1 and if we change it
# to something else (like the TCB ptr), then things are not ideal. For example, gdb
# will display a "The current thread <Thread ID 1> has terminated" message.
# So, we will preserve 1 for the current thread and assign the TCB ptr for the
# others
if thread_running:
thread_id = self.QEMU_MONITOR_CURRENT_THREAD_ID
else:
thread_id = thread_ptr
thread_name = self._target_read_cstr(thread_ptr + FRTOS_THREAD_NAME_OFFSET, 32)
stack = self._target_read_uint32(thread_ptr + FRTOS_THREAD_STACK_OFFSET)
registers = [0] * len(PebbleThread.reg_name_to_index)
for (offset, reg_index) in stack_offset_to_reg_index:
registers[reg_index] = self._target_read_uint32(stack + offset)
registers[13] = stack + thread_state_size
# Create the thread instance
thread = PebbleThread(id=thread_id, ptr=thread_ptr, running=thread_running,
name=thread_name, registers=registers)
self.threads[thread_id] = thread
logging.debug("Got thread info: %r" % (thread))
# Another thread in this list?
prev_elem_ptr = elem_ptr
elem_ptr = self._target_read_uint32(elem_ptr + FRTOS_LIST_ELEM_NEXT_OFFSET)
thread_count -= 1
##########################################################################################
def _handle_set_active_thread_req(self, data):
num = int(data, 16)
if (num == -1): # All threads
return
elif (num == 0): # Any thread
num = self.QEMU_MONITOR_CURRENT_THREAD_ID
self.active_thread_id = num
return self._create_packet("OK")
##########################################################################################
def _handle_continue_req(self, msg):
""" The format of this is: 'vCont[;action[:thread-id]]...'
The QEMU gdb server only understands a thread id of 1, so if we pass it other thread ids,
it will barf.
"""
if ';' not in msg:
return None
action_thread_pair = msg.split(';')[1]
if ':' in action_thread_pair:
action = action_thread_pair.split(':')[0]
else:
action = action_thread_pair
# Send to target with the thread ID
packet = self._create_packet("vCont;%s" % (action))
self.target_socket.send(packet)
# Change back to active thread of 1
self.active_thread_id = self.QEMU_MONITOR_CURRENT_THREAD_ID
return ''
##########################################################################################
def _handle_thread_is_alive_req(self, data):
num = int(data, 16)
if (num == -1 or num == 0): # All threads
return self._create_packet("OK")
if num in self.threads:
return self._create_packet("OK")
return self._create_packet("E22")
##########################################################################################
def _handle_get_all_registers_req(self):
""" Get all registers for the active thread """
resp = ''
for i in range(len(PebbleThread.reg_name_to_index)):
value = self._target_read_register(i)
resp += "%08X" % (byte_swap_uint32(value))
return self._create_packet(resp)
##########################################################################################
def _handle_query_req(self, msg):
msg = msg.split('#')[0]
query = msg.split(':')
logging.debug('GDB received query: %s', query)
if query is None:
logging.error('GDB received query packet malformed')
return None
elif query[0] == 'C':
return self._create_packet("%d" % (self.active_thread_id))
elif query[0] == 'fThreadInfo':
if not self.got_all_symbols:
# NOTE: When running the 4.9 gcc tool chain, gdb asks for thread info right
# after attaching, before we have a chance to look up symbols, so respond
# with "last thread" if we don't have symbols yet.
return self._create_packet("l") # last
self._target_collect_thread_info()
# For some strange reason, if the active thread is first, the first "i thread" gdb
# command only displays that one thread, so reverse sort to put it at the end
id_strs = ("%016x" % id for id in sorted(self.threads.keys(), reverse=True))
return self._create_packet("m" + ",".join(id_strs))
elif query[0] == 'sThreadInfo':
return self._create_packet("l") # last
elif query[0].startswith('ThreadExtraInfo'):
id_str = query[0].split(',')[1]
id = int(id_str, 16)
found_thread = self.threads.get(id, None)
if found_thread is None:
resp = "<INVALID THREAD ID: %d>" % (id)
elif found_thread.running:
resp = "%s 0x%08X: Running" % (found_thread.name, found_thread.ptr)
else:
resp = "%s 0x%08X" % (found_thread.name, found_thread.ptr)
return self._create_packet(resp.encode('hex'))
elif 'Symbol' in query[0]:
if query[2] != '':
sym_name = query[2].decode('hex')
if query[1] != '':
sym_value = int(query[1], 16)
logging.debug("Setting value of symbol '%s' to 0x%08x" % (sym_name, sym_value))
self.symbol_dict[sym_name] = sym_value
else:
logging.debug("Could not find value of symbol '%s'" % (sym_name))
self.symbol_dict[sym_name] = ''
# Anymore we need to look up?
symbol = None
for x, y in self.symbol_dict.items():
if y is None:
symbol = x
break
if symbol is not None:
logging.debug("Asking gdb to lookup symbol %s" % (symbol))
return self._create_packet('qSymbol:%s' % (symbol.encode('hex')))
else:
self.got_all_symbols = True
return self._create_packet('OK')
else:
return None
##########################################################################################
def _handle_request(self, msg):
""" See if we want to handle a request directly here in the proxy
retval: resp,
resp: Response to return.
if None, proxy doesn't deal with the request directly
"""
logging.debug('-->>>>>>>>>>>> GDB req packet: %s', msg)
msg = msg.split('#')[0]
# query command
if msg[1] == 'q':
return self._handle_query_req(msg[2:])
elif msg[1] == 'H':
if msg[2] == 'c':
return None
else:
return self._handle_set_active_thread_req(msg[3:])
elif msg[1] == 'T':
return self._handle_thread_is_alive_req(msg[2:])
elif msg[1] == 'g':
if (self.active_thread_id <= 0
or self.active_thread_id == self.QEMU_MONITOR_CURRENT_THREAD_ID):
return None
else:
return self._handle_get_all_registers_req()
elif msg[1] == 'p':
# 'p <n>' : read value of register n
if self.active_thread_id == self.QEMU_MONITOR_CURRENT_THREAD_ID:
return None
else:
msg = msg[2:]
reg_num = int(msg, 16)
value = self._target_read_register(reg_num)
return self._create_packet("%08X" % (byte_swap_uint32(value)))
elif msg[1] == 'P':
# 'P <n>=<r>' : set value of register n to r
if self.active_thread_id == self.QEMU_MONITOR_CURRENT_THREAD_ID:
return None
else:
msg = msg[2:].split('=')
reg_num = int(msg[0], 16)
val = int(msg[1], 16)
val = byte_swap_uint32(val)
self._target_write_register(reg_num, val)
return self._create_packet("OK")
elif msg[1:].startswith('vCont'):
return self._handle_continue_req(msg[1:])
else:
return None
##########################################################################################
def run(self):
""" Run the proxy """
# Connect to the target system first
logging.info("Connecting to target system on %s:%s" % (self.target_host, self.target_port))
start_time = time.time()
connected = False
self.target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while not connected and (time.time() - start_time < self.connect_timeout):
try:
self.target_socket.connect((self.target_host, self.target_port))
connected = True
except socket.error:
self.target_socket.close()
self.target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
time.sleep(0.1)
if not connected:
raise QemuGdbError("Unable to connect to target system on %s:%s. Is the emulator"
" running?" % (self.target_host, self.target_port))
logging.info("Connected to target system on %s:%s" % (self.target_host,
self.target_port))
# Open up our socket to accept connect requests from the client (gdb)
self.client_accept_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_accept_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.client_accept_socket.bind(('', self.client_accept_port))
self.client_accept_socket.listen(5)
# Empty out any unsolicited data sent from the target
(target_data, client_data) = self._fetch_socket_data(timeout=0.1)
# --------------------------------------------------------------------------------------
# Loop processing requests
data = ''
while True:
# read more data from client until we get at least one packet
while True:
(target_data, client_data) = self._fetch_socket_data()
# Pass through any response from the target back to gdb
if target_data and self.client_conn_socket is not None:
self.client_conn_socket.send(target_data)
# Ctrl-C interrupt?
if CTRL_C_CHARACTER in client_data:
self.target_socket.send(CTRL_C_CHARACTER)
client_data = client_data[client_data.index(CTRL_C_CHARACTER)+1:]
data += client_data
if "$" in data and "#" in data:
break
# Process all complete packets we have received from the client
while "$" in data and "#" in data:
data = data[data.index("$"):]
logging.debug("Processing remaining data: %s" % (data))
end = data.index("#") + 3 # 2 bytes of checksum
packet = data[0:end]
data = data[end:]
# decode and prepare resp
logging.debug("Processing packet: %s" % (packet))
resp = self._handle_request(packet)
# If it's nothing we care about, pass to target and return the response back to
# client
if resp is None:
logging.debug("Sending request to target: %s" % (packet))
self.target_socket.send(packet)
# else, we generated our own response that needs to go to the client
elif resp != '':
self.client_conn_socket.send('+' + resp)
# wait for ack from the client
(target_data, client_data) = self._fetch_socket_data()
if target_data:
self.client_conn_socket.send(target_data)
if client_data[0] != '+':
logging.debug('gdb client did not ack')
else:
logging.debug('gdb client acked')
# Add to our accumulated content
data += client_data[1:]
####################################################################################################
if __name__ == '__main__':
# Collect our command line arguments
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--port', type=int, default=1233,
help="Port to accept incomming connections on")
parser.add_argument('--target', default='localhost:1234',
help="target to connect to ")
parser.add_argument('--connect_timeout', type=float, default=1.0,
help="give up if we can't connect to the target within this timeout (sec)")
parser.add_argument('--debug', action='store_true',
help="Turn on debug logging")
args = parser.parse_args()
level = logging.INFO
if args.debug:
level = logging.DEBUG
logging.basicConfig(level=level)
(target_host, target_port) = args.target.split(':')
proxy = QemuGdbProxy(port=args.port, target_host=target_host, target_port=int(target_port),
connect_timeout=args.connect_timeout)
proxy.run()