mirror of
https://github.com/google/pebble.git
synced 2025-03-15 08:41:21 +00:00
192 lines
7.4 KiB
Python
192 lines
7.4 KiB
Python
|
# Copyright 2024 Google LLC
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
# you may not use this file except in compliance with the License.
|
||
|
# You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
|
import argparse
|
||
|
import array
|
||
|
import glob
|
||
|
import serial
|
||
|
import struct
|
||
|
import sys
|
||
|
|
||
|
import prompt
|
||
|
import stm32_crc as crc
|
||
|
from hdlc import HDLCDecoder
|
||
|
from serial_port_wrapper import SerialPortWrapper
|
||
|
|
||
|
'''
|
||
|
This script will invoke the microphone test command on snowy, read the
|
||
|
PCM encoded audio data from the serial port and store it to a .wav file,
|
||
|
which can be played back from any audio player. It can be invoked via
|
||
|
the './waf record' command or by running the script from the command line.
|
||
|
|
||
|
The output sample rate and sample size when recording using the waf script
|
||
|
can be configured by modifying the 'freq' and 'sample_size' variables,
|
||
|
respectively.
|
||
|
'''
|
||
|
|
||
|
FREQ = 8000 # Default Output sample rate from microphone (8000 or 16000)
|
||
|
SAMPLE_SIZE = 2 # Default sample size: 1 for 8-bit data, 2 for 16-bit data samples (must match what is output by watch)
|
||
|
seconds = 1
|
||
|
|
||
|
def store_wav(samples, filename, sample_size, freq):
|
||
|
size = len(samples) * sample_size
|
||
|
width = 8 * sample_size
|
||
|
byte_rate = freq * sample_size
|
||
|
|
||
|
wav = "RIFF"
|
||
|
wav += struct.pack('i', size + 44) #file size
|
||
|
wav += "WAVE"
|
||
|
wav += "fmt "
|
||
|
wav += struct.pack('i', 16) # fmt data length
|
||
|
wav += struct.pack('h', 1) # audio format (1 = PCM)
|
||
|
wav += struct.pack('h', 1) # channels
|
||
|
wav += struct.pack('i', freq) # sample rate
|
||
|
wav += struct.pack('i', byte_rate) # byte rate
|
||
|
wav += struct.pack('h', sample_size) # block alignment (bytes per block)
|
||
|
wav += struct.pack('h', width) # bits per sample
|
||
|
wav += "data"
|
||
|
wav += struct.pack('i', size) # data size
|
||
|
|
||
|
for sample in samples:
|
||
|
wav += struct.pack('B' if sample_size == 1 else 'H', sample)
|
||
|
|
||
|
with open(filename, 'wb') as f:
|
||
|
f.write(wav)
|
||
|
f.close()
|
||
|
print '{0} samples packed into {1}'.format(len(samples), filename)
|
||
|
|
||
|
def receive_hdlc_data(s, sample_size):
|
||
|
count = { 'i': 0 }
|
||
|
|
||
|
decoder = HDLCDecoder()
|
||
|
data = []
|
||
|
while True:
|
||
|
raw_data = s.read(1000)
|
||
|
if len(raw_data) == 0:
|
||
|
break
|
||
|
decoder.write(raw_data)
|
||
|
for frame in iter(decoder.get_frame, None):
|
||
|
count['i'] += 1
|
||
|
if len(frame) > 4:
|
||
|
frame = bytearray(frame)
|
||
|
frame_crc = frame[-4] | (frame[-3] << 8) | (frame[-2] << 16) | (frame[-1] << 24)
|
||
|
if crc.crc32(array.array('B', frame[:-4]).tostring()) == frame_crc:
|
||
|
data.extend(frame[:-4])
|
||
|
else:
|
||
|
print count['i']
|
||
|
|
||
|
print 'total frames received =', str(count['i'])
|
||
|
samples = []
|
||
|
for i in range(0, len(data), sample_size):
|
||
|
try:
|
||
|
d = 0
|
||
|
if sample_size == 1:
|
||
|
d = data[i]
|
||
|
elif sample_size == 2:
|
||
|
d = data[i] | (data[i + 1] << 8)
|
||
|
samples.append(d)
|
||
|
except:
|
||
|
print "conversion failed on word {0}".format(i/sample_size)
|
||
|
|
||
|
return samples
|
||
|
|
||
|
def open_serial_port(tty, baud_rate):
|
||
|
|
||
|
s = serial.serial_for_url(tty, baud_rate, timeout=2)
|
||
|
if s is not None:
|
||
|
print 'opened',tty,'at',str(baud_rate),'bps'
|
||
|
else:
|
||
|
print 'failed to open',tty
|
||
|
return s
|
||
|
|
||
|
def record_from_tty(tty_prompt, tty_accessory, t=seconds, filename='test.wav',
|
||
|
sample_size=SAMPLE_SIZE, sample_rate=FREQ, volume=100, accessory=False):
|
||
|
if tty_prompt == tty_accessory:
|
||
|
# sending commands via accessory connector, so set baud rate correctly
|
||
|
s = SerialPortWrapper(tty_prompt, baud_rate=115200)
|
||
|
else:
|
||
|
s = SerialPortWrapper(tty_prompt)
|
||
|
|
||
|
try:
|
||
|
prompt.go_to_prompt(s)
|
||
|
print 'record {0}-bit audio data for {1}s at {2}Hz (~{3} samples)'.format(
|
||
|
'8' if sample_size == 1 else '16', t, str(sample_rate), t * sample_rate)
|
||
|
cmd = 'mic start {0} {1} {2} {3}'.format(t, '8' if sample_size == 1 else '16', sample_rate,
|
||
|
volume)
|
||
|
prompt.issue_command(s, cmd)
|
||
|
s.close()
|
||
|
|
||
|
if (sample_size == 2) and (sample_rate == 16000):
|
||
|
s = open_serial_port(tty_accessory, 460800)
|
||
|
elif (sample_size == 2) or (sample_rate == 16000):
|
||
|
s = open_serial_port(tty_accessory, 230400)
|
||
|
else:
|
||
|
s = open_serial_port(tty_accessory, 115200)
|
||
|
|
||
|
samples = receive_hdlc_data(s, sample_size)
|
||
|
print '{0} samples read'.format(len(samples))
|
||
|
if len(samples) != (sample_rate * t):
|
||
|
print 'Not enough samples received ({0}/{1})'.format(len(samples), (sample_rate * t))
|
||
|
else:
|
||
|
print 'Output file: ' + filename
|
||
|
store_wav(samples, filename, sample_size, sample_rate)
|
||
|
|
||
|
finally:
|
||
|
s.close()
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
parser = argparse.ArgumentParser()
|
||
|
parser.add_argument('--tty_prompt', type=str,
|
||
|
help="Prompt tty (e.g. /dev/cu.usbserial-xxxxxxxB or /dev/ttyUSB0). If this"
|
||
|
"is specified, then the accessory port must be specified")
|
||
|
parser.add_argument('--tty_accessory', type=str,
|
||
|
help="Accessory port tty (e.g. /dev/cu.usbserial-xxxxxxxB or /dev/ttyUSB0)."
|
||
|
"If the commands are being sent via the accessory port, this and only this,"
|
||
|
"tty must be specified.")
|
||
|
parser.add_argument('-o', '--output', type=str, default='test.wav',
|
||
|
help="Output file name. Default: 'test.wav'")
|
||
|
parser.add_argument('-d', '--duration', type=int, default=seconds,
|
||
|
help="Number of seconds of audio that will be recorded. Default: 1s, Max: "
|
||
|
"60s")
|
||
|
parser.add_argument('-w', '--width', type=int, choices=[8, 16], default=16,
|
||
|
help="Sample data width (8- or 16-bit). Default: 16-bit")
|
||
|
parser.add_argument('-r', '--rate', type=int, choices=[8000, 16000], default=8000,
|
||
|
help="Sample rate in Hz. Default: 8000")
|
||
|
parser.add_argument('-v', '--volume', type=int, default=100,
|
||
|
help="Volume (1 - 1000). Default: 100")
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
|
||
|
if args.tty_accessory and not args.tty_prompt:
|
||
|
tty_accessory = args.tty_accessory
|
||
|
tty_prompt = args.tty_accessory
|
||
|
elif args.tty_prompt and not args.tty_accessory:
|
||
|
raise Exception("If the prompt tty is specified, the accessory port tty must be specified "
|
||
|
"too!")
|
||
|
elif not args.tty_prompt and not args.tty_accessory:
|
||
|
import pebble_tty
|
||
|
|
||
|
tty_prompt = pebble_tty.find_dbgserial_tty()
|
||
|
tty_accessory = pebble_tty.find_accessory_tty()
|
||
|
if not tty_prompt or not tty_accessory:
|
||
|
raise Exception("Serial ports could not be resolved!")
|
||
|
else:
|
||
|
tty_accessory = args.tty_accessory
|
||
|
tty_prompt = args.tty_prompt
|
||
|
|
||
|
sample_size = args.width / 8
|
||
|
record_from_tty(tty_prompt, tty_accessory, args.duration, args.output, sample_size,
|
||
|
args.rate, min(args.volume, 1000))
|
||
|
|