mirror of
https://github.com/google/pebble.git
synced 2025-03-15 08:41:21 +00:00
240 lines
8.4 KiB
Python
Executable file
240 lines
8.4 KiB
Python
Executable file
#!/usr/bin/env python
|
|
# Copyright 2024 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import argparse
|
|
import csv
|
|
import glob
|
|
import numpy
|
|
import pylab
|
|
import re
|
|
import scipy.interpolate
|
|
import scipy.signal
|
|
import sys
|
|
from collections import defaultdict
|
|
from pprint import pprint
|
|
|
|
VOLTAGE_SUFFIX = '_voltage'
|
|
CURRENT_SUFFIX = '_VBAT'
|
|
|
|
def get_pair(voltage_path):
|
|
return [voltage_path, re.sub('(\w+)%s.csv' % VOLTAGE_SUFFIX,
|
|
'\\1%s.csv' % CURRENT_SUFFIX,
|
|
voltage_path)]
|
|
|
|
def scrape_path(path, paths, output):
|
|
if path.endswith('%s.csv' % VOLTAGE_SUFFIX):
|
|
pair = get_pair(path)
|
|
output.append(pair)
|
|
paths.remove(pair[0])
|
|
elif not path.endswith('%s.csv' % CURRENT_SUFFIX):
|
|
output.append([path, ''])
|
|
|
|
def scrape_directory(output, path=''):
|
|
paths = []
|
|
csv_files = glob.glob('%s*.csv' % path)
|
|
for path in csv_files:
|
|
scrape_path(path, csv_files, output)
|
|
return paths
|
|
|
|
def parse_paths(paths):
|
|
parsed_paths = []
|
|
if not paths or paths[0] == 'None':
|
|
print 'No input provided: Grabbing all CSV pairs in current directory'
|
|
scrape_directory(parsed_paths)
|
|
else:
|
|
for path in paths:
|
|
if not path.endswith('.csv'):
|
|
scrape_directory(parsed_paths, path + '/')
|
|
else:
|
|
scrape_path(path, paths, parsed_paths)
|
|
if not len(parsed_paths):
|
|
raise Exception('Needs data!')
|
|
|
|
return parsed_paths
|
|
|
|
def extract_csv(path):
|
|
with open(path, 'rU') as csv_file:
|
|
reader = csv.DictReader(csv_file, skipinitialspace=True)
|
|
result = defaultdict(list)
|
|
for row in reader:
|
|
for key, value in row.items():
|
|
result[key].append(float(value))
|
|
|
|
result = dict(result)
|
|
return result
|
|
|
|
def get_data(voltage_path, current_path):
|
|
result = extract_csv(voltage_path)
|
|
|
|
if not current_path:
|
|
return result
|
|
|
|
current = extract_csv(current_path)
|
|
|
|
if current['current'][1] < 0:
|
|
current['current'] = numpy.array(current['current']) * -1
|
|
|
|
result['current'] = numpy.interp(result['time'], current['time'], current['current'])
|
|
return result
|
|
|
|
def round_to(vals, base=5):
|
|
return [[data[0], int(base * round(float(data[1])/base))] for data in vals]
|
|
|
|
def get_curve(data, range=[100,90,80,70,60,50,40,30,20,10,5,2,0], is_discharge=False, capacity=0.0, graph=False):
|
|
if 'current' in data:
|
|
data['current'] = numpy.array(data['current'])
|
|
|
|
# Eliminate all current <= 0 mA -> Drawing current
|
|
current_threshold = data['current'] > 0
|
|
time = numpy.array(data['time'])[current_threshold]
|
|
|
|
# Use mAh as scale
|
|
d_time = (numpy.insert(numpy.diff(time), 0, 0) / 3600.)
|
|
mah = numpy.cumsum(d_time * data['current'][current_threshold])
|
|
|
|
print 'mAh: %d' % mah[-1]
|
|
|
|
# Convert mAh to SOC
|
|
offset = 0
|
|
if not capacity:
|
|
capacity = mah[-1]
|
|
elif not is_discharge:
|
|
# If given the battery's capacity, it's assumed that it's 100% SOC.
|
|
offset = capacity - mah[-1]
|
|
scale = (mah + offset) / capacity * 100
|
|
else:
|
|
print 'Using time as scale!'
|
|
scale = (numpy.array(data['time'])/data['time'][-1]*100)
|
|
|
|
voltage = data['voltage']
|
|
cutoff = None
|
|
|
|
if is_discharge:
|
|
# If discharging, 100% is at 0s so we need to reverse the arrays.
|
|
scale = 100 - scale
|
|
scale = scale[::-1]
|
|
voltage = voltage[::-1]
|
|
data['voltage'] = data['voltage'][::-1]
|
|
else:
|
|
# Eliminate all values after the last maximum value in the last 30%: Charging complete
|
|
d_voltage = numpy.diff(voltage)[-int(len(voltage)*0.30):]
|
|
reverse = d_voltage[::-1]
|
|
reverse_drop_index = reverse.argmin()
|
|
drop_index = len(voltage) - reverse_drop_index - 1
|
|
# If the voltage drop is greater than 10mV and the time difference between 100% SOC
|
|
# is within 3 time units, then the voltage has probably dropped due to the completion of charging.
|
|
if reverse[reverse_drop_index] < -10 and abs(drop_index - len(scale)) <= 3:
|
|
voltage = voltage[:drop_index]
|
|
scale = scale[:drop_index]
|
|
cutoff = data['time'][drop_index]
|
|
print 'Detected end of charge! Dropping values past %ds @ %dmV.' % (data['time'][drop_index], voltage[-1])
|
|
|
|
print 'Scale starting @ %.2f%%, ending @ %.2f%%' % (scale[0], scale[-1])
|
|
|
|
window = 51
|
|
if len(data['voltage']) < 51:
|
|
window = 5
|
|
|
|
avg = scipy.signal.savgol_filter(voltage, window, 3)
|
|
threshold = scale <= 100.0
|
|
voltage = scipy.interpolate.InterpolatedUnivariateSpline(scale[threshold], avg[threshold], k=1)
|
|
curve = numpy.array([range, voltage(range)])
|
|
|
|
if graph:
|
|
# Plot voltage, current, and SOC on a graph.
|
|
fig = pylab.figure(tight_layout=True)
|
|
fig.subplots_adjust(right=0.85)
|
|
axis = fig.add_subplot(111)
|
|
axis.set_xlabel("Time (s)")
|
|
axis.set_ylabel("Voltage (mV)")
|
|
axis.plot(data['time'], data['voltage'], '-r')
|
|
if 'current' in data:
|
|
current = axis.twinx()
|
|
current.set_ylabel("Current (mA)")
|
|
current.plot(data['time'], data['current'], '-g')
|
|
soc = axis.twinx()
|
|
soc.set_ylabel("SOC (%)")
|
|
soc.spines['right'].set_position(('axes', 1.1))
|
|
soc.set_ylim(0, 100)
|
|
soc.plot(data['time'][:len(scale)], scale, '-b')
|
|
if cutoff:
|
|
axis.axvline(cutoff, c='k', ls='--')
|
|
axis.set_xlim(data['time'][0], data['time'][-1])
|
|
if is_discharge:
|
|
axis.invert_xaxis()
|
|
fig.show()
|
|
|
|
return curve.transpose().astype('int16')
|
|
|
|
def get_avg_curve(paths, range, is_discharge, graph, capacity):
|
|
parsed_paths = parse_paths(paths)
|
|
|
|
pprint(parsed_paths)
|
|
|
|
avg = numpy.array(numpy.zeros((len(range), 2)))
|
|
for voltage_csv, current_csv in parsed_paths:
|
|
curve = get_curve(get_data(voltage_csv, current_csv),
|
|
range,
|
|
is_discharge,
|
|
capacity,
|
|
graph)
|
|
if graph:
|
|
fig = pylab.figure(0, tight_layout=True)
|
|
fig.gca().plot(curve.transpose()[0], curve.transpose()[1], '--')
|
|
avg += curve
|
|
avg /= len(parsed_paths)
|
|
|
|
if graph:
|
|
fig.gca().plot(avg.transpose()[0], avg.transpose()[1])
|
|
fig.gca().set_ylim(3200, 4500)
|
|
if is_discharge:
|
|
fig.gca().invert_xaxis()
|
|
fig.show()
|
|
return round_to(avg.astype('int16').tolist())
|
|
|
|
def main(argv):
|
|
""" Generate a power curve. """
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-i', '--input', default='None',
|
|
help='The data as pairs of files: Voltage,Current')
|
|
parser.add_argument('-o', '--output',
|
|
help='Where should we write the results?')
|
|
parser.add_argument('-c', '--curve', default='100,90,80,70,60,50,40,30,20,10,5,2,0',
|
|
help='What values of the curve do we calculate?')
|
|
parser.add_argument('-d', '--discharge', action='store_true',
|
|
help='Is this discharge data?')
|
|
parser.add_argument('-g', '--graph', action='store_true',
|
|
help='Should we show a graph?')
|
|
parser.add_argument('-bc', '--capacity', default='0',
|
|
help='What is the battery\'s capacity? (in mA)')
|
|
|
|
args = parser.parse_args()
|
|
curve = get_avg_curve(args.input.split(','),
|
|
[int(val) for val in args.curve.split(',')],
|
|
args.discharge,
|
|
args.graph,
|
|
float(args.capacity))
|
|
|
|
output = ''
|
|
for percent, voltage in reversed(curve):
|
|
output += '\t{%-4s%5d},\n'.expandtabs(2) % (str(percent) + ',', voltage)
|
|
|
|
# Get rid of extra newline and comma
|
|
print output[:-2]
|
|
raw_input('Press enter to continue')
|
|
|
|
if __name__ == '__main__':
|
|
main(sys.argv[1:])
|