pebble/tools/battery_curve.py
2025-01-27 11:38:16 -08:00

240 lines
8.4 KiB
Python
Executable file

#!/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import csv
import glob
import numpy
import pylab
import re
import scipy.interpolate
import scipy.signal
import sys
from collections import defaultdict
from pprint import pprint
VOLTAGE_SUFFIX = '_voltage'
CURRENT_SUFFIX = '_VBAT'
def get_pair(voltage_path):
return [voltage_path, re.sub('(\w+)%s.csv' % VOLTAGE_SUFFIX,
'\\1%s.csv' % CURRENT_SUFFIX,
voltage_path)]
def scrape_path(path, paths, output):
if path.endswith('%s.csv' % VOLTAGE_SUFFIX):
pair = get_pair(path)
output.append(pair)
paths.remove(pair[0])
elif not path.endswith('%s.csv' % CURRENT_SUFFIX):
output.append([path, ''])
def scrape_directory(output, path=''):
paths = []
csv_files = glob.glob('%s*.csv' % path)
for path in csv_files:
scrape_path(path, csv_files, output)
return paths
def parse_paths(paths):
parsed_paths = []
if not paths or paths[0] == 'None':
print 'No input provided: Grabbing all CSV pairs in current directory'
scrape_directory(parsed_paths)
else:
for path in paths:
if not path.endswith('.csv'):
scrape_directory(parsed_paths, path + '/')
else:
scrape_path(path, paths, parsed_paths)
if not len(parsed_paths):
raise Exception('Needs data!')
return parsed_paths
def extract_csv(path):
with open(path, 'rU') as csv_file:
reader = csv.DictReader(csv_file, skipinitialspace=True)
result = defaultdict(list)
for row in reader:
for key, value in row.items():
result[key].append(float(value))
result = dict(result)
return result
def get_data(voltage_path, current_path):
result = extract_csv(voltage_path)
if not current_path:
return result
current = extract_csv(current_path)
if current['current'][1] < 0:
current['current'] = numpy.array(current['current']) * -1
result['current'] = numpy.interp(result['time'], current['time'], current['current'])
return result
def round_to(vals, base=5):
return [[data[0], int(base * round(float(data[1])/base))] for data in vals]
def get_curve(data, range=[100,90,80,70,60,50,40,30,20,10,5,2,0], is_discharge=False, capacity=0.0, graph=False):
if 'current' in data:
data['current'] = numpy.array(data['current'])
# Eliminate all current <= 0 mA -> Drawing current
current_threshold = data['current'] > 0
time = numpy.array(data['time'])[current_threshold]
# Use mAh as scale
d_time = (numpy.insert(numpy.diff(time), 0, 0) / 3600.)
mah = numpy.cumsum(d_time * data['current'][current_threshold])
print 'mAh: %d' % mah[-1]
# Convert mAh to SOC
offset = 0
if not capacity:
capacity = mah[-1]
elif not is_discharge:
# If given the battery's capacity, it's assumed that it's 100% SOC.
offset = capacity - mah[-1]
scale = (mah + offset) / capacity * 100
else:
print 'Using time as scale!'
scale = (numpy.array(data['time'])/data['time'][-1]*100)
voltage = data['voltage']
cutoff = None
if is_discharge:
# If discharging, 100% is at 0s so we need to reverse the arrays.
scale = 100 - scale
scale = scale[::-1]
voltage = voltage[::-1]
data['voltage'] = data['voltage'][::-1]
else:
# Eliminate all values after the last maximum value in the last 30%: Charging complete
d_voltage = numpy.diff(voltage)[-int(len(voltage)*0.30):]
reverse = d_voltage[::-1]
reverse_drop_index = reverse.argmin()
drop_index = len(voltage) - reverse_drop_index - 1
# If the voltage drop is greater than 10mV and the time difference between 100% SOC
# is within 3 time units, then the voltage has probably dropped due to the completion of charging.
if reverse[reverse_drop_index] < -10 and abs(drop_index - len(scale)) <= 3:
voltage = voltage[:drop_index]
scale = scale[:drop_index]
cutoff = data['time'][drop_index]
print 'Detected end of charge! Dropping values past %ds @ %dmV.' % (data['time'][drop_index], voltage[-1])
print 'Scale starting @ %.2f%%, ending @ %.2f%%' % (scale[0], scale[-1])
window = 51
if len(data['voltage']) < 51:
window = 5
avg = scipy.signal.savgol_filter(voltage, window, 3)
threshold = scale <= 100.0
voltage = scipy.interpolate.InterpolatedUnivariateSpline(scale[threshold], avg[threshold], k=1)
curve = numpy.array([range, voltage(range)])
if graph:
# Plot voltage, current, and SOC on a graph.
fig = pylab.figure(tight_layout=True)
fig.subplots_adjust(right=0.85)
axis = fig.add_subplot(111)
axis.set_xlabel("Time (s)")
axis.set_ylabel("Voltage (mV)")
axis.plot(data['time'], data['voltage'], '-r')
if 'current' in data:
current = axis.twinx()
current.set_ylabel("Current (mA)")
current.plot(data['time'], data['current'], '-g')
soc = axis.twinx()
soc.set_ylabel("SOC (%)")
soc.spines['right'].set_position(('axes', 1.1))
soc.set_ylim(0, 100)
soc.plot(data['time'][:len(scale)], scale, '-b')
if cutoff:
axis.axvline(cutoff, c='k', ls='--')
axis.set_xlim(data['time'][0], data['time'][-1])
if is_discharge:
axis.invert_xaxis()
fig.show()
return curve.transpose().astype('int16')
def get_avg_curve(paths, range, is_discharge, graph, capacity):
parsed_paths = parse_paths(paths)
pprint(parsed_paths)
avg = numpy.array(numpy.zeros((len(range), 2)))
for voltage_csv, current_csv in parsed_paths:
curve = get_curve(get_data(voltage_csv, current_csv),
range,
is_discharge,
capacity,
graph)
if graph:
fig = pylab.figure(0, tight_layout=True)
fig.gca().plot(curve.transpose()[0], curve.transpose()[1], '--')
avg += curve
avg /= len(parsed_paths)
if graph:
fig.gca().plot(avg.transpose()[0], avg.transpose()[1])
fig.gca().set_ylim(3200, 4500)
if is_discharge:
fig.gca().invert_xaxis()
fig.show()
return round_to(avg.astype('int16').tolist())
def main(argv):
""" Generate a power curve. """
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input', default='None',
help='The data as pairs of files: Voltage,Current')
parser.add_argument('-o', '--output',
help='Where should we write the results?')
parser.add_argument('-c', '--curve', default='100,90,80,70,60,50,40,30,20,10,5,2,0',
help='What values of the curve do we calculate?')
parser.add_argument('-d', '--discharge', action='store_true',
help='Is this discharge data?')
parser.add_argument('-g', '--graph', action='store_true',
help='Should we show a graph?')
parser.add_argument('-bc', '--capacity', default='0',
help='What is the battery\'s capacity? (in mA)')
args = parser.parse_args()
curve = get_avg_curve(args.input.split(','),
[int(val) for val in args.curve.split(',')],
args.discharge,
args.graph,
float(args.capacity))
output = ''
for percent, voltage in reversed(curve):
output += '\t{%-4s%5d},\n'.expandtabs(2) % (str(percent) + ',', voltage)
# Get rid of extra newline and comma
print output[:-2]
raw_input('Press enter to continue')
if __name__ == '__main__':
main(sys.argv[1:])