pebble/tools/power_parser.py

299 lines
9.8 KiB
Python
Raw Permalink Normal View History

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from serial_port_wrapper import SerialPortWrapper
import time
import re
from bisect import bisect_left
import json
from datetime import datetime
import sys
from collections import deque
from collections import namedtuple
import argparse
PowerDataPoint = namedtuple('PowerDataPoint', ['timestamp', 'power', 'duration'])
class PowerSystem:
def __init__(self):
# list of named tuples consisting of (timestamp, power, duration)
self.powerData = []
# queue of non-overlapping power segment tuples (timestamp, power, duration)
self.powerDeque = deque()
def getPowerTuples(self):
return self.powerData
def addPower(self, timestamp, power, duration):
self.powerData.append(PowerDataPoint(timestamp, power, duration))
self.powerDeque.append(PowerDataPoint(timestamp, power, duration))
def getAvgPowerBetween(self, startTimestamp, endTimestamp):
if len(self.powerDeque) == 0:
return 0
powerSum = 0
powerCount = 0
nextPower = self.powerDeque.popleft()
while nextPower is not None and nextPower.timestamp < endTimestamp:
if nextPower.timestamp + nextPower.duration < startTimestamp:
if len(self.powerDeque) == 0:
nextPower = None
else:
nextPower = self.powerDeque.popleft()
continue
#activeDuration = min(endTimestamp - nextPower.timestamp, nextPower.duration, endTimestamp - startTimestamp)
# TODO: there is a minor bug where if a system has a very long period, we will bin the beginning of its power
# into a later bin.
activeDuration = min(endTimestamp - nextPower.timestamp, nextPower.duration)
powerSum = powerSum + nextPower.power * 1.0 * activeDuration / (endTimestamp - startTimestamp)
powerCount = powerCount + 1
if activeDuration < nextPower.duration:
# the power tuple active period extends over the end of this duration
nextPower = PowerDataPoint(endTimestamp, nextPower.power, nextPower.duration - activeDuration)
break
else:
if len(self.powerDeque) == 0:
nextPower = None
break
else:
nextPower = self.powerDeque.popleft()
if nextPower is not None:
self.powerDeque.appendleft(nextPower)
if powerCount > 0:
return powerSum / (powerCount *1.0)
else:
return 0
# Most systems fall into this category:
# the duration that they are on is profiled
class IntervalPowerSystem(PowerSystem):
def __init__(self, interval=1024, activePower=0):
PowerSystem.__init__(self)
self.interval = interval
self.activePower = activePower
def addPower(self, timestamp, data):
duration = min(int(data[0]), self.interval)
#PowerSystem.addPower(self, timestamp, (self.activePower * 1.0) * (duration * 1.0 / self.interval), self.interval)
PowerSystem.addPower(self, timestamp, (self.activePower * 1.0), duration)
# Special case for the battery
class BattPowerSystem(PowerSystem):
def addPower(self, timestamp, data):
chg_state = data[0]
voltage = data[1]
return
# Special case for the accelerometer
class AccelPowerSystem(PowerSystem):
def addPower(self, timestamp, data):
state = data[0]
frequency = data[1]
return
# Special case for the magnetometer
class MagPowerSystem(PowerSystem):
def addPower(self, timestamp, data):
state = data[0]
adc_rate = data[1]
return
# Special case for the vibe motor
class VibePowerSystem(PowerSystem):
def addPower(self, timestamp, data):
state = data[0]
freq = data[1]
duty = data[2]
return
# Special case for the backlight
class BacklightPowerSystem(PowerSystem):
def __init__(self):
PowerSystem.__init__(self)
self.lastPowerDatum = PowerDataPoint(0,0,0)
def addPower(self, timestamp, data):
state = data[0]
freq = int(data[1])
duty = int(data[2])
if state == 'OFF':
power = 0
else:
FULL_DUTY_CURRENT = 100.0 # TODO: figure out exactly what this really is
power = duty * FULL_DUTY_CURRENT / 100.0
PowerSystem.addPower(self, self.lastPowerDatum.timestamp, self.lastPowerDatum.power, timestamp - self.lastPowerDatum.timestamp)
self.lastPowerDatum = PowerDataPoint(timestamp, power, 0)
# Peripheral clocks
# Datasheet Pebble
# fHCLK: 120MHz 64MHz
# fPCLK1: fHCLK/4 fHCLK/4
# fPCLK2: fHCLK/2 fHCLK/2
f_HCLK_DATA = 120.0
APB1_PRE_DATA = 4.0
APB2_PRE_DATA = 2.0
f_HCLK_PEBBLE = 64.0
APB1_PRE_PEBBLE = 4.0
APB2_PRE_PEBBLE = 2.0
AHB1_SCALE = f_HCLK_PEBBLE/f_HCLK_DATA
APB1_SCALE = f_HCLK_PEBBLE/f_HCLK_DATA * APB1_PRE_PEBBLE/APB1_PRE_DATA
APB2_SCALE = f_HCLK_PEBBLE/f_HCLK_DATA * APB2_PRE_PEBBLE/APB2_PRE_DATA
powerSystems = {
'2v5Reg': IntervalPowerSystem(activePower = 0.0025),
'McuCoreSleep': IntervalPowerSystem(activePower = 5.0),
'McuCoreRun': IntervalPowerSystem(activePower = 13.0),
'5vReg': IntervalPowerSystem(activePower = 0.007), # currently unused because it is on all the time
'McuGpioA': IntervalPowerSystem(activePower = 0.45*AHB1_SCALE),
'McuGpioB': IntervalPowerSystem(activePower = 0.43*AHB1_SCALE),
'McuGpioC': IntervalPowerSystem(activePower = 0.46*AHB1_SCALE),
'McuGpioD': IntervalPowerSystem(activePower = 0.44*AHB1_SCALE),
'McuGpioH': IntervalPowerSystem(activePower = 0.42*AHB1_SCALE),
'McuCrc': IntervalPowerSystem(activePower = 1.17*AHB1_SCALE),
'McuDma1': IntervalPowerSystem(activePower = 2.76*AHB1_SCALE),
'McuDma2': IntervalPowerSystem(activePower = 2.85*AHB1_SCALE),
'McuTim3': IntervalPowerSystem(activePower = 0.49*APB1_SCALE),
'McuTim4': IntervalPowerSystem(activePower = 0.54*APB1_SCALE),
'McuUsart3': IntervalPowerSystem(activePower = 0.25*APB1_SCALE),
'McuI2C1': IntervalPowerSystem(activePower = 0.25*APB1_SCALE),
'McuI2C2': IntervalPowerSystem(activePower = 0.25*APB1_SCALE),
'McuSpi2': IntervalPowerSystem(activePower = 0.20*APB1_SCALE),
'McuPwr': IntervalPowerSystem(activePower = 0.15*APB1_SCALE),
'McuSpi1': IntervalPowerSystem(activePower = 1.20*APB2_SCALE),
'McuUsart1': IntervalPowerSystem(activePower = 0.38*APB2_SCALE),
'McuTim1': IntervalPowerSystem(activePower = 1.06*APB2_SCALE),
'McuAdc1': IntervalPowerSystem(activePower = 2.13*APB2_SCALE),
'McuAdc2': IntervalPowerSystem(activePower = 2.04*APB2_SCALE),
'FlashRead': IntervalPowerSystem(activePower = 1.70),
'FlashWrite': IntervalPowerSystem(activePower = 20.0),
'FlashErase': IntervalPowerSystem(activePower = 20.0),
'AccelLowPower':IntervalPowerSystem(),
'AccelNormal': IntervalPowerSystem(),
'Mfi': IntervalPowerSystem(),
'Mag': IntervalPowerSystem(),
'BtShutdown': IntervalPowerSystem(),
'BtDeepSleep': IntervalPowerSystem(),
'BtActive': IntervalPowerSystem(activePower = 2.5),
'Ambient': IntervalPowerSystem(),
'Profiling': IntervalPowerSystem(),
'Battery': BattPowerSystem(),
'Accel': AccelPowerSystem(),
'Mag': MagPowerSystem(),
'Vibe': VibePowerSystem(),
'Backlight': BacklightPowerSystem() }
plottedSystems = ['2v5Reg',
'McuCoreSleep',
'McuCoreRun',
'McuGpioA',
'McuGpioB',
'McuGpioC',
'McuGpioD',
'McuGpioH',
'McuCrc',
'McuDma1',
'McuDma2',
'McuSpi2',
'McuSpi1',
'FlashRead',
'FlashWrite',
'FlashErase',
'Mfi',
'BtActive',
'Backlight']
def gatherData(tty, outfile):
# regex to find power tracking info:
pwr_regex = re.compile(r">>>PWR:(.*)<")
if tty is not None:
s = SerialPortWrapper(tty)
else:
s = sys.stdin
f = open(outfile, 'w')
systemKeys = powerSystems.keys()
try:
lastOutputTimestamp = -1
outString = '"ticks"'
for system in plottedSystems:
outString = '%s,"%s"' % (outString, system)
f.write("%s\n" % outString)
while True:
powerLine = pwr_regex.search(s.readline())
if not powerLine:
continue
data = powerLine.group(1)
split_data = data.split(',')
if len(split_data) < 2:
continue
system = split_data[1]
if not system in systemKeys:
continue
timestamp = int(split_data[0])
powerSystems[system].addPower(timestamp, split_data[2:])
# output the rectangular data as it comes in
if lastOutputTimestamp == -1:
lastOutputTimestamp = timestamp
latency = 4*1024
for ts in range(lastOutputTimestamp, timestamp-1-latency, 1024):
outString = "%d" % ts
for system in plottedSystems:
avgPower = powerSystems[system].getAvgPowerBetween(ts, ts + 1024)
outString = "%s,%f" % (outString, avgPower)
f.write("%s\n" % outString)
lastOutputTimestamp = ts+1024
except KeyboardInterrupt:
print "Bye"
finally:
s.close()
f.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Power Profiling Parser')
parser.add_argument('--tty', help='serial terminal. Otherwise takes stdin')
parser.add_argument("-o", "--outfile", help="path to the output file")
args = parser.parse_args()
if len(sys.argv) < 2 or not args.outfile:
parser.print_help()
sys.exit(1)
gatherData(args.tty, args.outfile)