mirror of
https://github.com/google/pebble.git
synced 2025-03-15 00:31:21 +00:00
134 lines
3.8 KiB
Python
134 lines
3.8 KiB
Python
# Copyright 2024 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
import os
|
|
import readline
|
|
import threading
|
|
import sys
|
|
|
|
from pebble import pulse2, commander
|
|
from log_hashing.logdehash import LogDehash
|
|
|
|
|
|
PROMPT_STRING = '> '
|
|
|
|
|
|
def erase_current_line():
|
|
sys.stdout.write('\r'+' '*(len(readline.get_line_buffer())+2)+'\r')
|
|
sys.stdout.flush()
|
|
|
|
|
|
def handle_prompt_command(interface):
|
|
cmd = raw_input(PROMPT_STRING)
|
|
if not cmd:
|
|
return
|
|
|
|
link = interface.get_link()
|
|
prompt = commander.apps.Prompt(link)
|
|
|
|
try:
|
|
for line in prompt.command_and_response(cmd):
|
|
print(line)
|
|
except commander.exceptions.CommandTimedOut:
|
|
print("Command '%s' timed out" % cmd)
|
|
finally:
|
|
prompt.close()
|
|
|
|
|
|
def handle_log_messages(interface, dehasher):
|
|
logging = commander.apps.StreamingLogs(interface)
|
|
while True:
|
|
try:
|
|
msg = logging.receive(block=True)
|
|
except pulse2.exceptions.SocketClosed:
|
|
break
|
|
line_dict = dehasher.dehash(msg)
|
|
|
|
erase_current_line()
|
|
print(dehasher.commander_format_line(line_dict))
|
|
sys.stdout.write(PROMPT_STRING + readline.get_line_buffer())
|
|
sys.stdout.flush()
|
|
|
|
|
|
def start_logging_thread(*args):
|
|
log_thread = threading.Thread(target=handle_log_messages, args=args)
|
|
log_thread.daemon = True
|
|
log_thread.start()
|
|
|
|
|
|
def generate_dehash_arguments():
|
|
def yes_no_to_bool(arg):
|
|
return True if arg == 'yes' else False
|
|
|
|
args = {
|
|
'justify': 'small',
|
|
'color': False,
|
|
'bold': -1,
|
|
'print_core': False,
|
|
'dict_path': os.environ.get('PBL_CONSOLE_DICT_PATH', 'build/src/fw/loghash_dict.json')
|
|
}
|
|
|
|
arglist = os.getenv("PBL_CONSOLE_ARGS")
|
|
if arglist:
|
|
for arg in arglist.split(","):
|
|
if not arg:
|
|
break
|
|
key, value = arg.split('=')
|
|
if key == "--justify":
|
|
args['justify'] = value
|
|
elif key == "--color":
|
|
args['color'] = yes_no_to_bool(value)
|
|
elif key == "--bold":
|
|
args['bold'] = int(value)
|
|
elif key == "--dict":
|
|
args['dict_path'] = value
|
|
elif key == "--core":
|
|
args['print_core'] = yes_no_to_bool(value)
|
|
else:
|
|
raise Exception("Unknown console argument '{}'. Choices are ({})".
|
|
format(key, ['--justify', '--color', '--bold',
|
|
'--dict', '--core']))
|
|
|
|
return args
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Pebble Console')
|
|
parser.add_argument('-t', '--tty', help='serial port (defaults to auto-detect)', metavar='TTY',
|
|
default=None)
|
|
|
|
args = parser.parse_args()
|
|
interface = pulse2.Interface.open_dbgserial(url=args.tty)
|
|
|
|
dehasher = LogDehash(**generate_dehash_arguments())
|
|
|
|
start_logging_thread(interface, dehasher)
|
|
|
|
print('--- PULSE terminal on %s ---' % args.tty)
|
|
print('--- Ctrl-C or Ctrl-D to exit ---')
|
|
|
|
try:
|
|
while True:
|
|
handle_prompt_command(interface)
|
|
except (KeyboardInterrupt, EOFError):
|
|
erase_current_line()
|
|
finally:
|
|
interface.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|