# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import array import glob import serial import struct import sys import prompt import stm32_crc as crc from hdlc import HDLCDecoder from serial_port_wrapper import SerialPortWrapper ''' This script will invoke the microphone test command on snowy, read the PCM encoded audio data from the serial port and store it to a .wav file, which can be played back from any audio player. It can be invoked via the './waf record' command or by running the script from the command line. The output sample rate and sample size when recording using the waf script can be configured by modifying the 'freq' and 'sample_size' variables, respectively. ''' FREQ = 8000 # Default Output sample rate from microphone (8000 or 16000) SAMPLE_SIZE = 2 # Default sample size: 1 for 8-bit data, 2 for 16-bit data samples (must match what is output by watch) seconds = 1 def store_wav(samples, filename, sample_size, freq): size = len(samples) * sample_size width = 8 * sample_size byte_rate = freq * sample_size wav = "RIFF" wav += struct.pack('i', size + 44) #file size wav += "WAVE" wav += "fmt " wav += struct.pack('i', 16) # fmt data length wav += struct.pack('h', 1) # audio format (1 = PCM) wav += struct.pack('h', 1) # channels wav += struct.pack('i', freq) # sample rate wav += struct.pack('i', byte_rate) # byte rate wav += struct.pack('h', sample_size) # block alignment (bytes per block) wav += struct.pack('h', width) # bits per sample wav += "data" wav += struct.pack('i', size) # data size for sample in samples: wav += struct.pack('B' if sample_size == 1 else 'H', sample) with open(filename, 'wb') as f: f.write(wav) f.close() print '{0} samples packed into {1}'.format(len(samples), filename) def receive_hdlc_data(s, sample_size): count = { 'i': 0 } decoder = HDLCDecoder() data = [] while True: raw_data = s.read(1000) if len(raw_data) == 0: break decoder.write(raw_data) for frame in iter(decoder.get_frame, None): count['i'] += 1 if len(frame) > 4: frame = bytearray(frame) frame_crc = frame[-4] | (frame[-3] << 8) | (frame[-2] << 16) | (frame[-1] << 24) if crc.crc32(array.array('B', frame[:-4]).tostring()) == frame_crc: data.extend(frame[:-4]) else: print count['i'] print 'total frames received =', str(count['i']) samples = [] for i in range(0, len(data), sample_size): try: d = 0 if sample_size == 1: d = data[i] elif sample_size == 2: d = data[i] | (data[i + 1] << 8) samples.append(d) except: print "conversion failed on word {0}".format(i/sample_size) return samples def open_serial_port(tty, baud_rate): s = serial.serial_for_url(tty, baud_rate, timeout=2) if s is not None: print 'opened',tty,'at',str(baud_rate),'bps' else: print 'failed to open',tty return s def record_from_tty(tty_prompt, tty_accessory, t=seconds, filename='test.wav', sample_size=SAMPLE_SIZE, sample_rate=FREQ, volume=100, accessory=False): if tty_prompt == tty_accessory: # sending commands via accessory connector, so set baud rate correctly s = SerialPortWrapper(tty_prompt, baud_rate=115200) else: s = SerialPortWrapper(tty_prompt) try: prompt.go_to_prompt(s) print 'record {0}-bit audio data for {1}s at {2}Hz (~{3} samples)'.format( '8' if sample_size == 1 else '16', t, str(sample_rate), t * sample_rate) cmd = 'mic start {0} {1} {2} {3}'.format(t, '8' if sample_size == 1 else '16', sample_rate, volume) prompt.issue_command(s, cmd) s.close() if (sample_size == 2) and (sample_rate == 16000): s = open_serial_port(tty_accessory, 460800) elif (sample_size == 2) or (sample_rate == 16000): s = open_serial_port(tty_accessory, 230400) else: s = open_serial_port(tty_accessory, 115200) samples = receive_hdlc_data(s, sample_size) print '{0} samples read'.format(len(samples)) if len(samples) != (sample_rate * t): print 'Not enough samples received ({0}/{1})'.format(len(samples), (sample_rate * t)) else: print 'Output file: ' + filename store_wav(samples, filename, sample_size, sample_rate) finally: s.close() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--tty_prompt', type=str, help="Prompt tty (e.g. /dev/cu.usbserial-xxxxxxxB or /dev/ttyUSB0). If this" "is specified, then the accessory port must be specified") parser.add_argument('--tty_accessory', type=str, help="Accessory port tty (e.g. /dev/cu.usbserial-xxxxxxxB or /dev/ttyUSB0)." "If the commands are being sent via the accessory port, this and only this," "tty must be specified.") parser.add_argument('-o', '--output', type=str, default='test.wav', help="Output file name. Default: 'test.wav'") parser.add_argument('-d', '--duration', type=int, default=seconds, help="Number of seconds of audio that will be recorded. Default: 1s, Max: " "60s") parser.add_argument('-w', '--width', type=int, choices=[8, 16], default=16, help="Sample data width (8- or 16-bit). Default: 16-bit") parser.add_argument('-r', '--rate', type=int, choices=[8000, 16000], default=8000, help="Sample rate in Hz. Default: 8000") parser.add_argument('-v', '--volume', type=int, default=100, help="Volume (1 - 1000). Default: 100") args = parser.parse_args() if args.tty_accessory and not args.tty_prompt: tty_accessory = args.tty_accessory tty_prompt = args.tty_accessory elif args.tty_prompt and not args.tty_accessory: raise Exception("If the prompt tty is specified, the accessory port tty must be specified " "too!") elif not args.tty_prompt and not args.tty_accessory: import pebble_tty tty_prompt = pebble_tty.find_dbgserial_tty() tty_accessory = pebble_tty.find_accessory_tty() if not tty_prompt or not tty_accessory: raise Exception("Serial ports could not be resolved!") else: tty_accessory = args.tty_accessory tty_prompt = args.tty_prompt sample_size = args.width / 8 record_from_tty(tty_prompt, tty_accessory, args.duration, args.output, sample_size, args.rate, min(args.volume, 1000))